You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/10/22 22:14:44 UTC

git commit: SAMZA-9; upgrading samza to yarn 2.2.0.

Updated Branches:
  refs/heads/master 2cd18279c -> 4217fc7f9


SAMZA-9; upgrading samza to yarn 2.2.0.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4217fc7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4217fc7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4217fc7f

Branch: refs/heads/master
Commit: 4217fc7f9dd951c2c84b89e06daf8a1b4b0a420f
Parents: 2cd1827
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Oct 22 13:14:25 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Oct 22 13:14:25 2013 -0700

----------------------------------------------------------------------
 build.gradle                                    |   4 -
 gradle.properties                               |   1 -
 gradle/dependency-versions.gradle               |   1 +
 .../apache/samza/job/yarn/ClientHelper.scala    | 102 ++++-------
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |  13 +-
 .../job/yarn/SamzaAppMasterLifecycle.scala      |  11 +-
 .../samza/job/yarn/SamzaAppMasterService.scala  |  27 ++-
 .../job/yarn/SamzaAppMasterTaskManager.scala    |  66 +++----
 .../apache/samza/job/yarn/YarnAppMaster.scala   |  21 ++-
 .../org/apache/samza/job/yarn/YarnJob.scala     |   1 -
 .../webapp/ApplicationMasterWebServlet.scala    |   3 +-
 .../job/yarn/TestSamzaAppMasterLifecycle.scala  |  59 +++----
 .../yarn/TestSamzaAppMasterTaskManager.scala    | 174 +++++++++----------
 .../samza/job/yarn/TestYarnAppMaster.scala      |   1 -
 14 files changed, 234 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 1482ea9..f30128f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -104,10 +104,6 @@ project(":samza-serializers_$scalaVersion") {
 project(":samza-yarn_$scalaVersion") {
   apply plugin: 'scala'
 
-  jar {  
-    classifier = "yarn-$yarnVersion"
-  }
-
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index ed6390b..4bfa1c3 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,4 +1,3 @@
 group=org.apache.samza
 version=0.7.0
 scalaVersion=2.9.2
-yarnVersion=2.0.5-alpha

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 5f3fb32..f1f8831 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -9,4 +9,5 @@ ext {
   kafkaVersion = "0.8.1-SNAPSHOT"
   commonsHttpClientVersion = "3.1"
   leveldbVersion = "1.7"
+  yarnVersion = "2.2.0"
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index b2b529e..2339960 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -19,44 +19,37 @@
 
 package org.apache.samza.job.yarn
 
-import java.util.Collections
-
 import scala.collection.JavaConversions._
 import scala.collection.Map
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.net.NetUtils
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.Resource
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.api.records.ApplicationReport
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.api.records.LocalResource
 import org.apache.hadoop.yarn.api.records.LocalResourceType
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.Resource
 import org.apache.hadoop.yarn.api.records.YarnApplicationState
-import org.apache.hadoop.yarn.api.ClientRMProtocol
+import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.util.Records
-
+import org.apache.samza.SamzaException
 import org.apache.samza.job.ApplicationStatus
 import org.apache.samza.job.ApplicationStatus.New
 import org.apache.samza.job.ApplicationStatus.Running
 import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
 import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
-
 import grizzled.slf4j.Logging
-import org.apache.samza.SamzaException
+import java.util.Collections
+
+object ClientHelper {
+  val applicationType = "Samza"
+}
 
 /**
  * Client helper class required to submit an application master start script to the resource manager. Also
@@ -64,41 +57,27 @@ import org.apache.samza.SamzaException
  * container and its processes.
  */
 class ClientHelper(conf: Configuration) extends Logging {
-  val rpc = YarnRPC.create(conf)
-  val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS))
-  info("trying to connect to RM %s" format rmAddress)
-  val applicationsManager = rpc.getProxy(classOf[ClientRMProtocol], rmAddress, conf).asInstanceOf[ClientRMProtocol]
+  val yarnClient = YarnClient.createYarnClient
+  info("trying to connect to RM %s" format conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS))
+  yarnClient.init(conf);
+  yarnClient.start
   var appId: Option[ApplicationId] = None
 
   /**
    * Generate an application and submit it to the resource manager to start an application master
    */
-  def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, user: UserGroupInformation, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
-    val newAppRequest = Records.newRecord(classOf[GetNewApplicationRequest])
-    val newAppResponse = applicationsManager.getNewApplication(newAppRequest)
+  def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
+    val app = yarnClient.createApplication
+    val newAppResponse = app.getNewApplicationResponse
     var mem = memoryMb
     var cpu = cpuCore
 
-    // If we are asking for memory less than the minimum required, bump it
-    if (mem < newAppResponse.getMinimumResourceCapability().getMemory()) {
-      val min = newAppResponse.getMinimumResourceCapability().getMemory()
-      warn("requesting %s megs of memory, which is less than minimum capability of %s, so using minimum" format (mem, min))
-      mem = min
-    }
-
     // If we are asking for memory more than the max allowed, shout out
     if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) {
       throw new SamzaException("You're asking for more memory (%s) than is allowed by YARN: %s" format
         (mem, newAppResponse.getMaximumResourceCapability().getMemory()))
     }
 
-    // if we are asking for cpu less than the minimum required, bump it
-    if (cpu < newAppResponse.getMinimumResourceCapability().getVirtualCores()) {
-      val min = newAppResponse.getMinimumResourceCapability.getVirtualCores()
-      warn("requesting %s virtual cores of cpu, which is less than minimum capability of %s, so using minimum" format (cpu, min))
-      cpu = min
-    }
-
     // If we are asking for cpu more than the max allowed, shout out
     if (cpu > newAppResponse.getMaximumResourceCapability().getVirtualCores()) {
       throw new SamzaException("You're asking for more CPU (%s) than is allowed by YARN: %s" format
@@ -109,10 +88,9 @@ class ClientHelper(conf: Configuration) extends Logging {
 
     info("preparing to request resources for app id %s" format appId.get)
 
-    val appCtx = Records.newRecord(classOf[ApplicationSubmissionContext])
+    val appCtx = app.getApplicationSubmissionContext
     val containerCtx = Records.newRecord(classOf[ContainerLaunchContext])
     val resource = Records.newRecord(classOf[Resource])
-    val submitAppRequest = Records.newRecord(classOf[SubmitApplicationRequest])
     val packageResource = Records.newRecord(classOf[LocalResource])
 
     name match {
@@ -144,54 +122,48 @@ class ClientHelper(conf: Configuration) extends Logging {
     info("set memory request to %s for %s" format (mem, appId.get))
     resource.setVirtualCores(cpu)
     info("set cpu core request to %s for %s" format (cpu, appId.get))
-    containerCtx.setResource(resource)
+    appCtx.setResource(resource)
     containerCtx.setCommands(cmds.toList)
     info("set command to %s for %s" format (cmds, appId.get))
     containerCtx.setLocalResources(Collections.singletonMap("__package", packageResource))
     appCtx.setApplicationId(appId.get)
-    info("set app ID to %s" format (user, appId.get))
-    appCtx.setUser(user.getShortUserName)
-    info("set user to %s for %s" format (user, appId.get))
+    info("set app ID to %s" format appId.get)
     appCtx.setAMContainerSpec(containerCtx)
-    submitAppRequest.setApplicationSubmissionContext(appCtx)
+    appCtx.setApplicationType(ClientHelper.applicationType)
     info("submitting application request for %s" format appId.get)
-    applicationsManager.submitApplication(submitAppRequest)
+    yarnClient.submitApplication(appCtx)
     appId
   }
 
   def status(appId: ApplicationId): Option[ApplicationStatus] = {
-    val statusRequest = Records.newRecord(classOf[GetApplicationReportRequest])
-    statusRequest.setApplicationId(appId)
-    val statusResponse = applicationsManager.getApplicationReport(statusRequest)
-    convertState(statusResponse.getApplicationReport)
+    val statusResponse = yarnClient.getApplicationReport(appId)
+    convertState(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
   }
 
   def kill(appId: ApplicationId) {
-    val killRequest = Records.newRecord(classOf[KillApplicationRequest])
-    killRequest.setApplicationId(appId)
-    applicationsManager.forceKillApplication(killRequest)
+    yarnClient.killApplication(appId)
   }
 
   def getApplicationMaster(appId: ApplicationId): Option[ApplicationReport] = {
-    val getAppsReq = Records.newRecord(classOf[GetAllApplicationsRequest])
-    val getAppsRsp = applicationsManager.getAllApplications(getAppsReq)
-
-    getAppsRsp.getApplicationList.filter(appRep => appId.equals(appRep.getApplicationId())).headOption
+    yarnClient
+      .getApplications
+      .filter(appRep => appId.equals(appRep.getApplicationId()))
+      .headOption
   }
 
   def getApplicationMasters(status: Option[ApplicationStatus]): List[ApplicationReport] = {
-    val getAppsReq = Records.newRecord(classOf[GetAllApplicationsRequest])
-    val getAppsRsp = applicationsManager.getAllApplications(getAppsReq)
+    val getAppsRsp = yarnClient.getApplications
 
     status match {
-      case Some(status) => getAppsRsp.getApplicationList
-        .filter(appRep => status.equals(convertState(appRep).get)).toList
-      case None => getAppsRsp.getApplicationList.toList
+      case Some(status) => getAppsRsp
+        .filter(appRep => status.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get))
+        .toList
+      case None => getAppsRsp.toList
     }
   }
 
-  private def convertState(appReport: ApplicationReport): Option[ApplicationStatus] = {
-    (appReport.getYarnApplicationState(), appReport.getFinalApplicationStatus()) match {
+  private def convertState(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
+    (state, status) match {
       case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish)
       case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) => Some(UnsuccessfulFinish)
       case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index 7f830f2..1cac06a 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 import scala.collection.JavaConversions._
 import org.apache.samza.metrics.{ JmxServer, MetricsRegistryMap }
 import grizzled.slf4j.Logging
-import org.apache.hadoop.yarn.client.AMRMClientImpl
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
 import org.apache.samza.config.YarnConfig._
 import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 
 /**
  * When YARN executes an application master, it needs a bash command to
@@ -44,22 +45,22 @@ import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._
  */
 object SamzaAppMaster extends Logging {
   def main(args: Array[String]) {
-    val containerIdStr = System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV)
+    val containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString)
     info("got container id: %s" format containerIdStr)
     val containerId = ConverterUtils.toContainerId(containerIdStr)
     val applicationAttemptId = containerId.getApplicationAttemptId
     info("got app attempt id: %s" format applicationAttemptId)
-    val nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV)
+    val nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString)
     info("got node manager host: %s" format nodeHostString)
-    val nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV)
+    val nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString)
     info("got node manager port: %s" format nodePortString)
-    val nodeHttpPortString = System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV)
+    val nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString)
     info("got node manager http port: %s" format nodeHttpPortString)
     val config = new MapConfig(JsonConfigSerializer.fromJson(System.getenv(YarnConfig.ENV_CONFIG)))
     info("got config: %s" format config)
     val hConfig = new YarnConfiguration
     hConfig.set("fs.http.impl", "samza.util.hadoop.HttpFileSystem")
-    val amClient = new AMRMClientImpl(applicationAttemptId)
+    val amClient = new AMRMClientImpl[ContainerRequest]
     val clientHelper = new ClientHelper(hConfig)
     val registry = new MetricsRegistryMap
     val containerMem = config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
index 95a6f05..5d09265 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
@@ -20,7 +20,7 @@
 package org.apache.samza.job.yarn
 import grizzled.slf4j.Logging
 import org.apache.samza.SamzaException
-import org.apache.hadoop.yarn.client.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
  * this means registering and unregistering with the RM, and shutting down
  * when the RM tells us to Reboot.
  */
-class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient, conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient[_], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
   var validResourceRequest = true
   var shutdownMessage: String = null
 
@@ -43,15 +43,12 @@ class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: Samza
 
     // validate that the YARN cluster can handle our container resource requirements
     val maxCapability = response.getMaximumResourceCapability
-    val minCapability = response.getMinimumResourceCapability
     val maxMem = maxCapability.getMemory
-    val minMem = minCapability.getMemory
     val maxCpu = maxCapability.getVirtualCores
-    val minCpu = minCapability.getVirtualCores
 
-    info("Got AM register response. The YARN RM supports container requests with max-mem: %s, min-mem: %s, max-cpu: %s, min-cpu: %s" format (maxMem, minMem, maxCpu, minCpu))
+    info("Got AM register response. The YARN RM supports container requests with max-mem: %s, max-cpu: %s" format (maxMem, maxCpu))
 
-    if (containerMem < minMem || containerMem > maxMem || containerCpu < minCpu || containerCpu > maxCpu) {
+    if (containerMem > maxMem || containerCpu > maxCpu) {
       shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu)
       error(shutdownMessage)
       validResourceRequest = false

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
index ce3fcc3..82d90d4 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
@@ -32,6 +32,9 @@ import org.apache.samza.SamzaException
  * up the web service when initialized.
  */
 class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry: ReadableMetricsRegistry, clientHelper: ClientHelper) extends YarnAppMasterListener with Logging {
+  var rpcApp: WebAppServer = null
+  var webApp: WebAppServer = null
+
   override def onInit() {
     // try starting the samza AM dashboard. try ten times, just in case we 
     // pick a port that's already in use.
@@ -41,13 +44,13 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry
       info("Starting webapp at rpc %d, tracking port %d" format (rpcPort, trackingPort))
 
       try {
-        val rpcapp = new WebAppServer("/", rpcPort)
-        rpcapp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
-        rpcapp.start
+        rpcApp = new WebAppServer("/", rpcPort)
+        rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
+        rpcApp.start
 
-        val webapp = new WebAppServer("/", trackingPort)
-        webapp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
-        webapp.start
+        webApp = new WebAppServer("/", trackingPort)
+        webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
+        webApp.start
 
         state.rpcPort = rpcPort
         state.trackingPort = trackingPort
@@ -63,4 +66,16 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry
       throw new SamzaException("Giving up trying to start the webapp, since we keep getting ports that are already in use")
     }
   }
+
+  override def onShutdown() {
+    if (rpcApp != null) {
+      rpcApp.context.stop
+      rpcApp.server.stop
+    }
+
+    if (webApp != null) {
+      webApp.context.stop
+      webApp.server.stop
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index 1a13ee5..51097c1 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -38,8 +38,8 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
 import org.apache.samza.SamzaException
-import org.apache.hadoop.yarn.client.AMRMClient
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.api.records.Priority
 import org.apache.hadoop.yarn.api.records.Resource
 import org.apache.hadoop.yarn.util.Records
@@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.ContainerManager
 import org.apache.hadoop.yarn.api.records.LocalResourceType
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
 import org.apache.hadoop.yarn.ipc.YarnRPC
@@ -59,6 +58,11 @@ import org.apache.hadoop.net.NetUtils
 import java.util.Collections
 import java.security.PrivilegedAction
 import org.apache.samza.job.ShellCommandBuilder
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
+import java.nio.ByteBuffer
+import org.apache.hadoop.yarn.client.api.NMClient
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl
 
 object SamzaAppMasterTaskManager {
   val DEFAULT_CONTAINER_MEM = 256
@@ -79,7 +83,7 @@ case class TaskFailure(val count: Int, val lastFailure: Long)
  * containers, handling failures, and notifying the application master that the
  * job is done.
  */
-class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient, conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
   import SamzaAppMasterTaskManager._
 
   state.taskCount = config.getTaskCount match {
@@ -92,18 +96,28 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
   val partitions = Util.getMaxInputStreamPartitions(config)
   var taskFailures = Map[Int, TaskFailure]()
   var tooManyFailedContainers = false
+  var containerManager: NMClientImpl = null
 
   override def shouldShutdown = state.completedTasks == state.taskCount || tooManyFailedContainers
 
   override def onInit() {
     state.neededContainers = state.taskCount
     state.unclaimedTasks = (0 until state.taskCount).toSet
+    containerManager = new NMClientImpl()
+    containerManager.init(conf)
+    containerManager.start
 
     info("Requesting %s containers" format state.taskCount)
 
     requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), state.neededContainers)
   }
 
+  override def onShutdown {
+    if (containerManager != null) {
+      containerManager.stop
+    }
+  }
+
   override def onContainerAllocated(container: Container) {
     val containerIdStr = ConverterUtils.toString(container.getId)
 
@@ -124,15 +138,12 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
         info("Task ID %s using command %s" format (taskId, command))
         val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) }
         info("Task ID %s using env %s" format (taskId, env))
-        val user = UserGroupInformation.getCurrentUser
-        info("Task ID %s using user %s" format (taskId, user))
         val path = new Path(config.getPackagePath.get)
         info("Starting task ID %s using package path %s" format (taskId, path))
 
         startContainer(
           path,
           container,
-          user,
           env.toMap,
           "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/%s 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
 
@@ -273,26 +284,11 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
     }
   }
 
-  protected def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
-    info("starting container %s %s %s %s %s" format (packagePath, container, ugi, env, cmds))
+  protected def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
+    info("starting container %s %s %s %s" format (packagePath, container, env, cmds))
     // connect to container manager (based on similar code in the ContainerLauncher in Hadoop MapReduce)
     val contToken = container.getContainerToken
     val address = container.getNodeId.getHost + ":" + container.getNodeId.getPort
-    var user = ugi
-
-    if (UserGroupInformation.isSecurityEnabled) {
-      debug("security is enabled")
-      val hadoopToken = new Token[ContainerTokenIdentifier](contToken.getIdentifier.array, contToken.getPassword.array, new Text(contToken.getKind), new Text(contToken.getService))
-      user = UserGroupInformation.createRemoteUser(address)
-      user.addToken(hadoopToken)
-      debug("changed user to %s" format user)
-    }
-
-    val containerManager = user.doAs(new PrivilegedAction[ContainerManager] {
-      def run(): ContainerManager = {
-        return YarnRPC.create(conf).getProxy(classOf[ContainerManager], NetUtils.createSocketAddr(address), conf).asInstanceOf[ContainerManager]
-      }
-    })
 
     // set the local package so that the containers and app master are provisioned with it
     val packageResource = Records.newRecord(classOf[LocalResource])
@@ -305,12 +301,24 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
     packageResource.setType(LocalResourceType.ARCHIVE)
     packageResource.setVisibility(LocalResourceVisibility.APPLICATION)
 
+    // copy tokens (copied from dist shell example)
+    val credentials = UserGroupInformation.getCurrentUser.getCredentials
+    val dob = new DataOutputBuffer
+    credentials.writeTokenStorageToStream(dob)
+    // now remove the AM->RM token so that containers cannot access it
+    val iter = credentials.getAllTokens.iterator
+    while (iter.hasNext) {
+      val token = iter.next
+      if (token.getKind.equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove
+      }
+    }
+    val allTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength)
+
     // start the container
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
     ctx.setEnvironment(env)
-    ctx.setContainerId(container.getId())
-    ctx.setResource(container.getResource())
-    ctx.setUser(user.getShortUserName())
+    ctx.setTokens(allTokens.duplicate)
     ctx.setCommands(cmds.toList)
     ctx.setLocalResources(Collections.singletonMap("__package", packageResource))
 
@@ -319,7 +327,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
 
     val startContainerRequest = Records.newRecord(classOf[StartContainerRequest])
     startContainerRequest.setContainerLaunchContext(ctx)
-    containerManager.startContainer(startContainerRequest)
+    containerManager.startContainer(container, ctx)
   }
 
   protected def requestContainers(memMb: Int, cpuCores: Int, containers: Int) {
@@ -329,6 +337,6 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
     priority.setPriority(0)
     capability.setMemory(memMb)
     capability.setVirtualCores(cpuCores)
-    amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority, containers))
+    (0 until containers).foreach(idx => amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority)))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
index 14e3865..4938192 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
@@ -21,7 +21,9 @@ package org.apache.samza.job.yarn
 
 import scala.collection.JavaConversions._
 import grizzled.slf4j.Logging
-import org.apache.hadoop.yarn.client.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.api.records.AMCommand._
+import org.apache.samza.SamzaException
 
 /**
  * YARN's API is somewhat clunky. Most implementations just sit in a loop, and
@@ -36,20 +38,27 @@ import org.apache.hadoop.yarn.client.AMRMClient
  * SamzaAppMaster uses this class to wire up all of Samza's application master
  * listeners.
  */
-class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient) extends Logging {
+class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) extends Logging {
   var isShutdown = false
 
-  def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient) = this(1000, listeners, amClient)
+  def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) = this(1000, listeners, amClient)
 
   def run {
     try {
       listeners.foreach(_.onInit)
 
       while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _)) {
-        val response = amClient.allocate(0).getAMResponse
+        val response = amClient.allocate(0)
 
-        if (response.getReboot) {
-          listeners.foreach(_.onReboot)
+        if (response.getAMCommand != null) {
+          response.getAMCommand match {
+            case AM_RESYNC | AM_SHUTDOWN =>
+              listeners.foreach(_.onReboot)
+            case _ =>
+              val msg = "Unhandled value of AMCommand: " + response.getAMCommand
+              error(msg);
+              throw new SamzaException(msg);
+          }
         }
 
         listeners.foreach(_.onEventLoop)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index bde38e1..3f03399 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -49,7 +49,6 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
       new Path(config.getPackagePath.getOrElse(throw new SamzaException("No YARN package path defined in config."))),
       config.getAMContainerMaxMemoryMb.getOrElse(512),
       1,
-      UserGroupInformation.getCurrentUser,
       List(
         "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s"
           format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
index cbd7c1e..734d9d2 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
@@ -26,6 +26,7 @@ import org.apache.samza.config.Config
 import scala.collection.JavaConversions._
 import scala.collection.immutable.TreeMap
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
 class ApplicationMasterWebServlet(config: Config, state: SamzaAppMasterState) extends ScalatraServlet with ScalateSupport {
   val yarnConfig = new YarnConfiguration
@@ -38,6 +39,6 @@ class ApplicationMasterWebServlet(config: Config, state: SamzaAppMasterState) ex
     layoutTemplate("/WEB-INF/views/index.scaml",
       "config" -> TreeMap(config.toMap.toArray: _*),
       "state" -> state,
-      "rmHttpAddress" -> YarnConfiguration.getRMWebAppURL(yarnConfig))
+      "rmHttpAddress" -> WebAppUtils.getRMWebAppURLWithScheme(yarnConfig))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
index 8bf48eb..b24f85a 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
@@ -29,64 +29,53 @@ import org.apache.hadoop.yarn.api.records.ContainerId
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 import org.apache.hadoop.yarn.api.records.Priority
 import org.apache.hadoop.yarn.api.records.Resource
-import org.apache.hadoop.yarn.client.AMRMClient
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
 import org.apache.hadoop.yarn.api.records.Resource
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
-import org.apache.hadoop.yarn.service._
+import org.apache.hadoop.service._
 import org.apache.samza.SamzaException
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+import java.nio.ByteBuffer
 
 class TestSamzaAppMasterLifecycle {
-  val amClient = new AMRMClient {
+  val amClient = new AMRMClientImpl() {
     var host = ""
     var port = 0
     var status: FinalApplicationStatus = null
-    def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = {
+    override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = {
       this.host = appHostName
       this.port = appHostPort
       new RegisterApplicationMasterResponse {
-        def setApplicationACLs(map: java.util.Map[ApplicationAccessType, String]) = null
-        def getApplicationACLs = null
-        def setMaximumResourceCapability(r: Resource) = null
-        def getMaximumResourceCapability = new Resource {
+        override def setApplicationACLs(map: java.util.Map[ApplicationAccessType, String]) = null
+        override def getApplicationACLs = null
+        override def setMaximumResourceCapability(r: Resource) = null
+        override def getMaximumResourceCapability = new Resource {
           def getMemory = 512
           def getVirtualCores = 2
           def setMemory(memory: Int) {}
           def setVirtualCores(vCores: Int) {}
           def compareTo(o: Resource) = 0
         }
-        def setMinimumResourceCapability(r: Resource) = null
-        def getMinimumResourceCapability = new Resource {
-          def getMemory = 128
-          def getVirtualCores = 1
-          def setMemory(memory: Int) {}
-          def setVirtualCores(vCores: Int) {}
-          def compareTo(o: Resource) = 0
-        }
+        override def getClientToAMTokenMasterKey = null
+        override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {}
       }
     }
-    def allocate(progressIndicator: Float): AllocateResponse = null
-    def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
+    override def allocate(progressIndicator: Float): AllocateResponse = null
+    override def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
       appMessage: String,
       appTrackingUrl: String) {
       this.status = appStatus
     }
-    def addContainerRequest(req: ContainerRequest) {}
-    def removeContainerRequest(req: ContainerRequest) {}
-    def releaseAssignedContainer(containerId: ContainerId) {}
-    def getClusterAvailableResources(): Resource = null
-    def getClusterNodeCount() = 1
+    override def releaseAssignedContainer(containerId: ContainerId) {}
+    override def getClusterNodeCount() = 1
 
-    def init(config: Configuration) {}
-    def start() {}
-    def stop() {}
-    def register(listener: ServiceStateChangeListener) {}
-    def unregister(listener: ServiceStateChangeListener) {}
-    def getName(): String = ""
-    def getConfig() = null
-    def getServiceState() = null
-    def getStartTime() = 0L
+    override def init(config: Configuration) {}
+    override def start() {}
+    override def stop() {}
+    override def getName(): String = ""
+    override def getConfig() = null
+    override def getStartTime() = 0L
   }
 
   @Test
@@ -125,9 +114,7 @@ class TestSamzaAppMasterLifecycle {
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
     state.rpcPort = 1
     List(new SamzaAppMasterLifecycle(768, 1, state, amClient, new YarnConfiguration),
-      new SamzaAppMasterLifecycle(0, 1, state, amClient, new YarnConfiguration),
-      new SamzaAppMasterLifecycle(368, 3, state, amClient, new YarnConfiguration),
-      new SamzaAppMasterLifecycle(768, 0, state, amClient, new YarnConfiguration)).map(saml => {
+      new SamzaAppMasterLifecycle(368, 3, state, amClient, new YarnConfiguration)).map(saml => {
         saml.onInit
         assertTrue(saml.shouldShutdown)
       })

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index c9f7029..ee3ffef 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -32,9 +32,10 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-import org.apache.hadoop.yarn.client.AMRMClient
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.service._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
+import org.apache.hadoop.service._
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api.records.NodeReport
 import TestSamzaAppMasterTaskManager._
@@ -45,90 +46,89 @@ import org.apache.samza.SamzaException
 
 object TestSamzaAppMasterTaskManager {
   def getContainer(containerId: ContainerId) = new Container {
-    def getId(): ContainerId = containerId
-    def setId(id: ContainerId) {}
-    def getNodeId(): NodeId = new NodeId {
+    override def getId(): ContainerId = containerId
+    override def setId(id: ContainerId) {}
+    override def getNodeId(): NodeId = new NodeId {
       var host = ""
       var port = 12345
-      def getHost() = host
-      def setHost(host: String) = {
+      override def getHost() = host
+      override def setHost(host: String) = {
         this.host = host
       }
-      def getPort() = port
-      def setPort(port: Int) = {
+      override def getPort() = port
+      override def setPort(port: Int) = {
         this.port = port
       }
+      override def build() = null
     }
-    def setNodeId(nodeId: NodeId) {}
-    def getNodeHttpAddress(): String = ""
-    def setNodeHttpAddress(nodeHttpAddress: String) {}
-    def getResource(): Resource = null
-    def setResource(resource: Resource) {}
-    def getPriority(): Priority = null
-    def setPriority(priority: Priority) {}
-    def getState(): ContainerState = null
-    def setState(state: ContainerState) {}
-    def getContainerToken(): ContainerToken = null
-    def setContainerToken(containerToken: ContainerToken) {}
-    def getContainerStatus(): ContainerStatus = null
-    def setContainerStatus(containerStatus: ContainerStatus) {}
-    def compareTo(c: Container): Int = containerId.compareTo(c.getId)
+    override def setNodeId(nodeId: NodeId) {}
+    override def getNodeHttpAddress(): String = ""
+    override def setNodeHttpAddress(nodeHttpAddress: String) {}
+    override def getResource(): Resource = null
+    override def setResource(resource: Resource) {}
+    override def getPriority(): Priority = null
+    override def setPriority(priority: Priority) {}
+    override def getContainerToken(): Token = null
+    override def setContainerToken(containerToken: Token) {}
+    override def compareTo(c: Container): Int = containerId.compareTo(c.getId)
   }
 
   def getContainerStatus(containerId: ContainerId, exitCode: Int, diagnostic: String) = new ContainerStatus {
-    def getContainerId(): ContainerId = containerId
-    def setContainerId(containerId: ContainerId) {}
-    def getState(): ContainerState = null
-    def setState(state: ContainerState) {}
-    def getExitStatus(): Int = exitCode
-    def setExitStatus(exitStatus: Int) {}
-    def getDiagnostics() = diagnostic
-    def setDiagnostics(diagnostics: String) = {}
+    override def getContainerId(): ContainerId = containerId
+    override def setContainerId(containerId: ContainerId) {}
+    override def getState(): ContainerState = null
+    override def setState(state: ContainerState) {}
+    override def getExitStatus(): Int = exitCode
+    override def setExitStatus(exitStatus: Int) {}
+    override def getDiagnostics() = diagnostic
+    override def setDiagnostics(diagnostics: String) = {}
   }
 
-  def getAmClient = (response: AllocateResponse) => new AMRMClient {
+  def getAmClient = (response: AllocateResponse) => new AMRMClientImpl[ContainerRequest] {
     var requests: List[ContainerRequest] = List[ContainerRequest]()
-    var release: List[ContainerId] = List[ContainerId]()
-
-    def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = null
-    def allocate(progressIndicator: Float): AllocateResponse = response
-    def unregisterApplicationMaster(appStatus: FinalApplicationStatus, appMessage: String, appTrackingUrl: String) = null
-    def addContainerRequest(req: ContainerRequest) { requests ::= req }
-    def removeContainerRequest(req: ContainerRequest) {}
-    def releaseAssignedContainer(containerId: ContainerId) { release ::= containerId }
-    def getClusterAvailableResources(): Resource = null
-    def getClusterNodeCount() = 1
-
-    def init(config: Configuration) {}
-    def start() {}
-    def stop() {}
-    def register(listener: ServiceStateChangeListener) {}
-    def unregister(listener: ServiceStateChangeListener) {}
-    def getName(): String = ""
-    def getConfig() = null
-    def getServiceState() = null
-    def getStartTime() = 0L
+
+    def getRelease = release
+    def resetRelease = release.clear
+    override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = null
+    override def allocate(progressIndicator: Float): AllocateResponse = response
+    override def unregisterApplicationMaster(appStatus: FinalApplicationStatus, appMessage: String, appTrackingUrl: String) = null
+    override def addContainerRequest(req: ContainerRequest) { requests ::= req }
+    override def removeContainerRequest(req: ContainerRequest) {}
+    override def getClusterNodeCount() = 1
+
+    override def init(config: Configuration) {}
+    override def start() {}
+    override def stop() {}
+    override def getName(): String = ""
+    override def getConfig() = null
+    override def getStartTime() = 0L
   }
 
   def getAppMasterResponse(reboot: Boolean, containers: List[Container], completed: List[ContainerStatus]) =
     new AllocateResponse {
-      def getAMResponse = new AMResponse {
-        def getReboot(): Boolean = reboot
-        def setReboot(reboot: Boolean) {}
-        def getResponseId() = 0
-        def setResponseId(responseId: Int) {}
-        def getAllocatedContainers() = containers
-        def setAllocatedContainers(containers: java.util.List[Container]) {}
-        def getAvailableResources(): Resource = null
-        def setAvailableResources(limit: Resource) {}
-        def getCompletedContainersStatuses() = completed
-        def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {}
-        def setUpdatedNodes(nodes: java.util.List[NodeReport]) {}
-        def getUpdatedNodes = null
+      override def getResponseId() = 0
+      override def setResponseId(responseId: Int) {}
+      override def getAllocatedContainers() = containers
+      override def setAllocatedContainers(containers: java.util.List[Container]) {}
+      override def getAvailableResources(): Resource = null
+      override def setAvailableResources(limit: Resource) {}
+      override def getCompletedContainersStatuses() = completed
+      override def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {}
+      override def setUpdatedNodes(nodes: java.util.List[NodeReport]) {}
+      override def getUpdatedNodes = null
+      override def getNumClusterNodes = 1
+      override def setNumClusterNodes(num: Int) {}
+      override def getNMTokens = null
+      override def setNMTokens(nmTokens: java.util.List[NMToken]) {}
+      override def setAMCommand(command: AMCommand) {}
+      override def getPreemptionMessage = null
+      override def setPreemptionMessage(request: PreemptionMessage) {}
+
+      override def getAMCommand = if (reboot) {
+        AMCommand.AM_RESYNC
+      } else {
+        null
       }
-      def getNumClusterNodes = 1
-      def setNumClusterNodes(num: Int) {}
-      def setAMResponse(response: AMResponse) {}
     }
 }
 
@@ -169,7 +169,7 @@ class TestSamzaAppMasterTaskManager {
     val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
-      override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
+      override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
         // Do nothing.
       }
     }
@@ -182,13 +182,13 @@ class TestSamzaAppMasterTaskManager {
     assert(taskManager.shouldShutdown == false)
     // 2. First is from onInit, second is from onContainerCompleted, since it failed.
     assertEquals(2, amClient.requests.size)
-    assertEquals(0, amClient.release.size)
+    assertEquals(0, amClient.getRelease.size)
     assertFalse(taskManager.shouldShutdown)
     // Now trigger an AM shutdown since our retry count is 1, and we're failing twice
     taskManager.onContainerAllocated(getContainer(container2))
     taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here"))
     assertEquals(2, amClient.requests.size)
-    assertEquals(0, amClient.release.size)
+    assertEquals(0, amClient.getRelease.size)
     assertTrue(taskManager.shouldShutdown)
   }
 
@@ -200,7 +200,7 @@ class TestSamzaAppMasterTaskManager {
     var containersRequested = 0
     var containersStarted = 0
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
-      override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
+      override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
         containersStarted += 1
       }
 
@@ -216,7 +216,7 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onInit
     assert(taskManager.shouldShutdown == false)
     assert(amClient.requests.size == 1)
-    assert(amClient.release.size == 0)
+    assert(amClient.getRelease.size == 0)
 
     // allocate container 2
     taskManager.onContainerAllocated(getContainer(container2))
@@ -234,12 +234,12 @@ class TestSamzaAppMasterTaskManager {
     assert(state.taskPartitions.size == 1)
     assert(state.unclaimedTasks.size == 0)
     assert(amClient.requests.size == 1)
-    assert(amClient.release.size == 1)
-    assert(amClient.release(0).equals(container3))
+    assert(amClient.getRelease.size == 1)
+    assert(amClient.getRelease.head.equals(container3))
 
     // reset the helper state, so we can make sure that releasing the container (next step) doesn't request more resources
     amClient.requests = List()
-    amClient.release = List()
+    amClient.resetRelease
 
     // now release the container, and make sure the AM doesn't ask for more
     assert(taskManager.shouldShutdown == false)
@@ -250,14 +250,14 @@ class TestSamzaAppMasterTaskManager {
     assert(state.taskPartitions.size == 1)
     assert(state.unclaimedTasks.size == 0)
     assert(amClient.requests.size == 0)
-    assert(amClient.release.size == 0)
+    assert(amClient.getRelease.size == 0)
 
     // pretend container 2 is released due to an NM failure, and make sure that the AM requests a new container
     assert(taskManager.shouldShutdown == false)
     taskManager.onContainerCompleted(getContainerStatus(container2, -100, "pretend the container was 'lost' due to an NM failure"))
     assert(taskManager.shouldShutdown == false)
     assert(amClient.requests.size == 1)
-    assert(amClient.release.size == 0)
+    assert(amClient.getRelease.size == 0)
   }
 
   @Test
@@ -270,7 +270,7 @@ class TestSamzaAppMasterTaskManager {
     state.taskCount = 2
     var containersStarted = 0
     val taskManager = new SamzaAppMasterTaskManager(clock, newConfig, state, amClient, new YarnConfiguration) {
-      override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
+      override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
         containersStarted += 1
       }
     }
@@ -280,8 +280,8 @@ class TestSamzaAppMasterTaskManager {
     assert(taskManager.shouldShutdown == false)
     taskManager.onInit
     assert(taskManager.shouldShutdown == false)
-    assert(amClient.requests.size == 1)
-    assert(amClient.release.size == 0)
+    assert(amClient.requests.size == 2)
+    assert(amClient.getRelease.size == 0)
     taskManager.onContainerAllocated(getContainer(container2))
     assert(state.neededContainers == 1)
     assert(state.runningTasks.size == 1)
@@ -337,7 +337,7 @@ class TestSamzaAppMasterTaskManager {
     var containersRequested = 0
     var containersStarted = 0
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
-      override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
+      override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
         containersStarted += 1
       }
 
@@ -353,7 +353,7 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onInit
     assert(taskManager.shouldShutdown == false)
     assert(amClient.requests.size == 1)
-    assert(amClient.release.size == 0)
+    assert(amClient.getRelease.size == 0)
     assert(state.neededContainers == 1)
     assert(state.runningTasks.size == 0)
     assert(state.taskPartitions.size == 0)
@@ -373,8 +373,8 @@ class TestSamzaAppMasterTaskManager {
     assert(containersRequested == 1)
     assert(containersStarted == 1)
     assert(amClient.requests.size == 1)
-    assert(amClient.release.size == 1)
-    assert(amClient.release(0).equals(container3))
+    assert(amClient.getRelease.size == 1)
+    assert(amClient.getRelease.head.equals(container3))
   }
 
   @Test
@@ -429,6 +429,6 @@ class MockSystemFactory extends SystemFactory {
 
 class MockSinglePartitionManager extends SystemAdmin {
   def getPartitions(streamName: String) = Set(new Partition(0))
-  
+
   def getLastOffsets(streams: java.util.Set[String]) = throw new SamzaException("Need to implement this")
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
index 0040648..98f7844 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.api.records.Container
 import org.apache.hadoop.yarn.api.records.ContainerStatus
 import org.apache.hadoop.yarn.api.records.ResourceRequest
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.AMResponse
 import org.apache.hadoop.yarn.api.records.ContainerId
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse