You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/10/22 22:14:44 UTC
git commit: SAMZA-9; upgrading samza to yarn 2.2.0.
Updated Branches:
refs/heads/master 2cd18279c -> 4217fc7f9
SAMZA-9; upgrading samza to yarn 2.2.0.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4217fc7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4217fc7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4217fc7f
Branch: refs/heads/master
Commit: 4217fc7f9dd951c2c84b89e06daf8a1b4b0a420f
Parents: 2cd1827
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Oct 22 13:14:25 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Oct 22 13:14:25 2013 -0700
----------------------------------------------------------------------
build.gradle | 4 -
gradle.properties | 1 -
gradle/dependency-versions.gradle | 1 +
.../apache/samza/job/yarn/ClientHelper.scala | 102 ++++-------
.../apache/samza/job/yarn/SamzaAppMaster.scala | 13 +-
.../job/yarn/SamzaAppMasterLifecycle.scala | 11 +-
.../samza/job/yarn/SamzaAppMasterService.scala | 27 ++-
.../job/yarn/SamzaAppMasterTaskManager.scala | 66 +++----
.../apache/samza/job/yarn/YarnAppMaster.scala | 21 ++-
.../org/apache/samza/job/yarn/YarnJob.scala | 1 -
.../webapp/ApplicationMasterWebServlet.scala | 3 +-
.../job/yarn/TestSamzaAppMasterLifecycle.scala | 59 +++----
.../yarn/TestSamzaAppMasterTaskManager.scala | 174 +++++++++----------
.../samza/job/yarn/TestYarnAppMaster.scala | 1 -
14 files changed, 234 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 1482ea9..f30128f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -104,10 +104,6 @@ project(":samza-serializers_$scalaVersion") {
project(":samza-yarn_$scalaVersion") {
apply plugin: 'scala'
- jar {
- classifier = "yarn-$yarnVersion"
- }
-
dependencies {
compile project(':samza-api')
compile project(":samza-core_$scalaVersion")
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index ed6390b..4bfa1c3 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,4 +1,3 @@
group=org.apache.samza
version=0.7.0
scalaVersion=2.9.2
-yarnVersion=2.0.5-alpha
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 5f3fb32..f1f8831 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -9,4 +9,5 @@ ext {
kafkaVersion = "0.8.1-SNAPSHOT"
commonsHttpClientVersion = "3.1"
leveldbVersion = "1.7"
+ yarnVersion = "2.2.0"
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index b2b529e..2339960 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -19,44 +19,37 @@
package org.apache.samza.job.yarn
-import java.util.Collections
-
import scala.collection.JavaConversions._
import scala.collection.Map
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.Resource
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.api.records.ApplicationReport
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.api.records.LocalResource
import org.apache.hadoop.yarn.api.records.LocalResourceType
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
+import org.apache.hadoop.yarn.api.records.Resource
import org.apache.hadoop.yarn.api.records.YarnApplicationState
-import org.apache.hadoop.yarn.api.ClientRMProtocol
+import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.util.Records
-
+import org.apache.samza.SamzaException
import org.apache.samza.job.ApplicationStatus
import org.apache.samza.job.ApplicationStatus.New
import org.apache.samza.job.ApplicationStatus.Running
import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
-
import grizzled.slf4j.Logging
-import org.apache.samza.SamzaException
+import java.util.Collections
+
+object ClientHelper {
+ val applicationType = "Samza"
+}
/**
* Client helper class required to submit an application master start script to the resource manager. Also
@@ -64,41 +57,27 @@ import org.apache.samza.SamzaException
* container and its processes.
*/
class ClientHelper(conf: Configuration) extends Logging {
- val rpc = YarnRPC.create(conf)
- val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS))
- info("trying to connect to RM %s" format rmAddress)
- val applicationsManager = rpc.getProxy(classOf[ClientRMProtocol], rmAddress, conf).asInstanceOf[ClientRMProtocol]
+ val yarnClient = YarnClient.createYarnClient
+ info("trying to connect to RM %s" format conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS))
+ yarnClient.init(conf);
+ yarnClient.start
var appId: Option[ApplicationId] = None
/**
* Generate an application and submit it to the resource manager to start an application master
*/
- def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, user: UserGroupInformation, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
- val newAppRequest = Records.newRecord(classOf[GetNewApplicationRequest])
- val newAppResponse = applicationsManager.getNewApplication(newAppRequest)
+ def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
+ val app = yarnClient.createApplication
+ val newAppResponse = app.getNewApplicationResponse
var mem = memoryMb
var cpu = cpuCore
- // If we are asking for memory less than the minimum required, bump it
- if (mem < newAppResponse.getMinimumResourceCapability().getMemory()) {
- val min = newAppResponse.getMinimumResourceCapability().getMemory()
- warn("requesting %s megs of memory, which is less than minimum capability of %s, so using minimum" format (mem, min))
- mem = min
- }
-
// If we are asking for memory more than the max allowed, shout out
if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) {
throw new SamzaException("You're asking for more memory (%s) than is allowed by YARN: %s" format
(mem, newAppResponse.getMaximumResourceCapability().getMemory()))
}
- // if we are asking for cpu less than the minimum required, bump it
- if (cpu < newAppResponse.getMinimumResourceCapability().getVirtualCores()) {
- val min = newAppResponse.getMinimumResourceCapability.getVirtualCores()
- warn("requesting %s virtual cores of cpu, which is less than minimum capability of %s, so using minimum" format (cpu, min))
- cpu = min
- }
-
// If we are asking for cpu more than the max allowed, shout out
if (cpu > newAppResponse.getMaximumResourceCapability().getVirtualCores()) {
throw new SamzaException("You're asking for more CPU (%s) than is allowed by YARN: %s" format
@@ -109,10 +88,9 @@ class ClientHelper(conf: Configuration) extends Logging {
info("preparing to request resources for app id %s" format appId.get)
- val appCtx = Records.newRecord(classOf[ApplicationSubmissionContext])
+ val appCtx = app.getApplicationSubmissionContext
val containerCtx = Records.newRecord(classOf[ContainerLaunchContext])
val resource = Records.newRecord(classOf[Resource])
- val submitAppRequest = Records.newRecord(classOf[SubmitApplicationRequest])
val packageResource = Records.newRecord(classOf[LocalResource])
name match {
@@ -144,54 +122,48 @@ class ClientHelper(conf: Configuration) extends Logging {
info("set memory request to %s for %s" format (mem, appId.get))
resource.setVirtualCores(cpu)
info("set cpu core request to %s for %s" format (cpu, appId.get))
- containerCtx.setResource(resource)
+ appCtx.setResource(resource)
containerCtx.setCommands(cmds.toList)
info("set command to %s for %s" format (cmds, appId.get))
containerCtx.setLocalResources(Collections.singletonMap("__package", packageResource))
appCtx.setApplicationId(appId.get)
- info("set app ID to %s" format (user, appId.get))
- appCtx.setUser(user.getShortUserName)
- info("set user to %s for %s" format (user, appId.get))
+ info("set app ID to %s" format appId.get)
appCtx.setAMContainerSpec(containerCtx)
- submitAppRequest.setApplicationSubmissionContext(appCtx)
+ appCtx.setApplicationType(ClientHelper.applicationType)
info("submitting application request for %s" format appId.get)
- applicationsManager.submitApplication(submitAppRequest)
+ yarnClient.submitApplication(appCtx)
appId
}
def status(appId: ApplicationId): Option[ApplicationStatus] = {
- val statusRequest = Records.newRecord(classOf[GetApplicationReportRequest])
- statusRequest.setApplicationId(appId)
- val statusResponse = applicationsManager.getApplicationReport(statusRequest)
- convertState(statusResponse.getApplicationReport)
+ val statusResponse = yarnClient.getApplicationReport(appId)
+ convertState(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
}
def kill(appId: ApplicationId) {
- val killRequest = Records.newRecord(classOf[KillApplicationRequest])
- killRequest.setApplicationId(appId)
- applicationsManager.forceKillApplication(killRequest)
+ yarnClient.killApplication(appId)
}
def getApplicationMaster(appId: ApplicationId): Option[ApplicationReport] = {
- val getAppsReq = Records.newRecord(classOf[GetAllApplicationsRequest])
- val getAppsRsp = applicationsManager.getAllApplications(getAppsReq)
-
- getAppsRsp.getApplicationList.filter(appRep => appId.equals(appRep.getApplicationId())).headOption
+ yarnClient
+ .getApplications
+ .filter(appRep => appId.equals(appRep.getApplicationId()))
+ .headOption
}
def getApplicationMasters(status: Option[ApplicationStatus]): List[ApplicationReport] = {
- val getAppsReq = Records.newRecord(classOf[GetAllApplicationsRequest])
- val getAppsRsp = applicationsManager.getAllApplications(getAppsReq)
+ val getAppsRsp = yarnClient.getApplications
status match {
- case Some(status) => getAppsRsp.getApplicationList
- .filter(appRep => status.equals(convertState(appRep).get)).toList
- case None => getAppsRsp.getApplicationList.toList
+ case Some(status) => getAppsRsp
+ .filter(appRep => status.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get))
+ .toList
+ case None => getAppsRsp.toList
}
}
- private def convertState(appReport: ApplicationReport): Option[ApplicationStatus] = {
- (appReport.getYarnApplicationState(), appReport.getFinalApplicationStatus()) match {
+ private def convertState(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
+ (state, status) match {
case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish)
case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) => Some(UnsuccessfulFinish)
case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index 7f830f2..1cac06a 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import scala.collection.JavaConversions._
import org.apache.samza.metrics.{ JmxServer, MetricsRegistryMap }
import grizzled.slf4j.Logging
-import org.apache.hadoop.yarn.client.AMRMClientImpl
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
import org.apache.samza.config.YarnConfig._
import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
/**
* When YARN executes an application master, it needs a bash command to
@@ -44,22 +45,22 @@ import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._
*/
object SamzaAppMaster extends Logging {
def main(args: Array[String]) {
- val containerIdStr = System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV)
+ val containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString)
info("got container id: %s" format containerIdStr)
val containerId = ConverterUtils.toContainerId(containerIdStr)
val applicationAttemptId = containerId.getApplicationAttemptId
info("got app attempt id: %s" format applicationAttemptId)
- val nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV)
+ val nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString)
info("got node manager host: %s" format nodeHostString)
- val nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV)
+ val nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString)
info("got node manager port: %s" format nodePortString)
- val nodeHttpPortString = System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV)
+ val nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString)
info("got node manager http port: %s" format nodeHttpPortString)
val config = new MapConfig(JsonConfigSerializer.fromJson(System.getenv(YarnConfig.ENV_CONFIG)))
info("got config: %s" format config)
val hConfig = new YarnConfiguration
hConfig.set("fs.http.impl", "samza.util.hadoop.HttpFileSystem")
- val amClient = new AMRMClientImpl(applicationAttemptId)
+ val amClient = new AMRMClientImpl[ContainerRequest]
val clientHelper = new ClientHelper(hConfig)
val registry = new MetricsRegistryMap
val containerMem = config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
index 95a6f05..5d09265 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
@@ -20,7 +20,7 @@
package org.apache.samza.job.yarn
import grizzled.slf4j.Logging
import org.apache.samza.SamzaException
-import org.apache.hadoop.yarn.client.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
* this means registering and unregistering with the RM, and shutting down
* when the RM tells us to Reboot.
*/
-class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient, conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient[_], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
var validResourceRequest = true
var shutdownMessage: String = null
@@ -43,15 +43,12 @@ class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: Samza
// validate that the YARN cluster can handle our container resource requirements
val maxCapability = response.getMaximumResourceCapability
- val minCapability = response.getMinimumResourceCapability
val maxMem = maxCapability.getMemory
- val minMem = minCapability.getMemory
val maxCpu = maxCapability.getVirtualCores
- val minCpu = minCapability.getVirtualCores
- info("Got AM register response. The YARN RM supports container requests with max-mem: %s, min-mem: %s, max-cpu: %s, min-cpu: %s" format (maxMem, minMem, maxCpu, minCpu))
+ info("Got AM register response. The YARN RM supports container requests with max-mem: %s, max-cpu: %s" format (maxMem, maxCpu))
- if (containerMem < minMem || containerMem > maxMem || containerCpu < minCpu || containerCpu > maxCpu) {
+ if (containerMem > maxMem || containerCpu > maxCpu) {
shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu)
error(shutdownMessage)
validResourceRequest = false
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
index ce3fcc3..82d90d4 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
@@ -32,6 +32,9 @@ import org.apache.samza.SamzaException
* up the web service when initialized.
*/
class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry: ReadableMetricsRegistry, clientHelper: ClientHelper) extends YarnAppMasterListener with Logging {
+ var rpcApp: WebAppServer = null
+ var webApp: WebAppServer = null
+
override def onInit() {
// try starting the samza AM dashboard. try ten times, just in case we
// pick a port that's already in use.
@@ -41,13 +44,13 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry
info("Starting webapp at rpc %d, tracking port %d" format (rpcPort, trackingPort))
try {
- val rpcapp = new WebAppServer("/", rpcPort)
- rpcapp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
- rpcapp.start
+ rpcApp = new WebAppServer("/", rpcPort)
+ rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry))
+ rpcApp.start
- val webapp = new WebAppServer("/", trackingPort)
- webapp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
- webapp.start
+ webApp = new WebAppServer("/", trackingPort)
+ webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state))
+ webApp.start
state.rpcPort = rpcPort
state.trackingPort = trackingPort
@@ -63,4 +66,16 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry
throw new SamzaException("Giving up trying to start the webapp, since we keep getting ports that are already in use")
}
}
+
+ override def onShutdown() {
+ if (rpcApp != null) {
+ rpcApp.context.stop
+ rpcApp.server.stop
+ }
+
+ if (webApp != null) {
+ webApp.context.stop
+ webApp.server.stop
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index 1a13ee5..51097c1 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -38,8 +38,8 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.samza.util.Util
import scala.collection.JavaConversions._
import org.apache.samza.SamzaException
-import org.apache.hadoop.yarn.client.AMRMClient
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.api.records.Priority
import org.apache.hadoop.yarn.api.records.Resource
import org.apache.hadoop.yarn.util.Records
@@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.ContainerManager
import org.apache.hadoop.yarn.api.records.LocalResourceType
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
import org.apache.hadoop.yarn.ipc.YarnRPC
@@ -59,6 +58,11 @@ import org.apache.hadoop.net.NetUtils
import java.util.Collections
import java.security.PrivilegedAction
import org.apache.samza.job.ShellCommandBuilder
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
+import java.nio.ByteBuffer
+import org.apache.hadoop.yarn.client.api.NMClient
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl
object SamzaAppMasterTaskManager {
val DEFAULT_CONTAINER_MEM = 256
@@ -79,7 +83,7 @@ case class TaskFailure(val count: Int, val lastFailure: Long)
* containers, handling failures, and notifying the application master that the
* job is done.
*/
-class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient, conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
import SamzaAppMasterTaskManager._
state.taskCount = config.getTaskCount match {
@@ -92,18 +96,28 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
val partitions = Util.getMaxInputStreamPartitions(config)
var taskFailures = Map[Int, TaskFailure]()
var tooManyFailedContainers = false
+ var containerManager: NMClientImpl = null
override def shouldShutdown = state.completedTasks == state.taskCount || tooManyFailedContainers
override def onInit() {
state.neededContainers = state.taskCount
state.unclaimedTasks = (0 until state.taskCount).toSet
+ containerManager = new NMClientImpl()
+ containerManager.init(conf)
+ containerManager.start
info("Requesting %s containers" format state.taskCount)
requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), state.neededContainers)
}
+ override def onShutdown {
+ if (containerManager != null) {
+ containerManager.stop
+ }
+ }
+
override def onContainerAllocated(container: Container) {
val containerIdStr = ConverterUtils.toString(container.getId)
@@ -124,15 +138,12 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
info("Task ID %s using command %s" format (taskId, command))
val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) }
info("Task ID %s using env %s" format (taskId, env))
- val user = UserGroupInformation.getCurrentUser
- info("Task ID %s using user %s" format (taskId, user))
val path = new Path(config.getPackagePath.get)
info("Starting task ID %s using package path %s" format (taskId, path))
startContainer(
path,
container,
- user,
env.toMap,
"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/%s 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT, ApplicationConstants.STDERR))
@@ -273,26 +284,11 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
}
}
- protected def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
- info("starting container %s %s %s %s %s" format (packagePath, container, ugi, env, cmds))
+ protected def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
+ info("starting container %s %s %s %s" format (packagePath, container, env, cmds))
// connect to container manager (based on similar code in the ContainerLauncher in Hadoop MapReduce)
val contToken = container.getContainerToken
val address = container.getNodeId.getHost + ":" + container.getNodeId.getPort
- var user = ugi
-
- if (UserGroupInformation.isSecurityEnabled) {
- debug("security is enabled")
- val hadoopToken = new Token[ContainerTokenIdentifier](contToken.getIdentifier.array, contToken.getPassword.array, new Text(contToken.getKind), new Text(contToken.getService))
- user = UserGroupInformation.createRemoteUser(address)
- user.addToken(hadoopToken)
- debug("changed user to %s" format user)
- }
-
- val containerManager = user.doAs(new PrivilegedAction[ContainerManager] {
- def run(): ContainerManager = {
- return YarnRPC.create(conf).getProxy(classOf[ContainerManager], NetUtils.createSocketAddr(address), conf).asInstanceOf[ContainerManager]
- }
- })
// set the local package so that the containers and app master are provisioned with it
val packageResource = Records.newRecord(classOf[LocalResource])
@@ -305,12 +301,24 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
packageResource.setType(LocalResourceType.ARCHIVE)
packageResource.setVisibility(LocalResourceVisibility.APPLICATION)
+ // copy tokens (copied from dist shell example)
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ val dob = new DataOutputBuffer
+ credentials.writeTokenStorageToStream(dob)
+ // now remove the AM->RM token so that containers cannot access it
+ val iter = credentials.getAllTokens.iterator
+ while (iter.hasNext) {
+ val token = iter.next
+ if (token.getKind.equals(AMRMTokenIdentifier.KIND_NAME)) {
+ iter.remove
+ }
+ }
+ val allTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength)
+
// start the container
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
ctx.setEnvironment(env)
- ctx.setContainerId(container.getId())
- ctx.setResource(container.getResource())
- ctx.setUser(user.getShortUserName())
+ ctx.setTokens(allTokens.duplicate)
ctx.setCommands(cmds.toList)
ctx.setLocalResources(Collections.singletonMap("__package", packageResource))
@@ -319,7 +327,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
val startContainerRequest = Records.newRecord(classOf[StartContainerRequest])
startContainerRequest.setContainerLaunchContext(ctx)
- containerManager.startContainer(startContainerRequest)
+ containerManager.startContainer(container, ctx)
}
protected def requestContainers(memMb: Int, cpuCores: Int, containers: Int) {
@@ -329,6 +337,6 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
priority.setPriority(0)
capability.setMemory(memMb)
capability.setVirtualCores(cpuCores)
- amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority, containers))
+ (0 until containers).foreach(idx => amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority)))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
index 14e3865..4938192 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
@@ -21,7 +21,9 @@ package org.apache.samza.job.yarn
import scala.collection.JavaConversions._
import grizzled.slf4j.Logging
-import org.apache.hadoop.yarn.client.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.api.records.AMCommand._
+import org.apache.samza.SamzaException
/**
* YARN's API is somewhat clunky. Most implementations just sit in a loop, and
@@ -36,20 +38,27 @@ import org.apache.hadoop.yarn.client.AMRMClient
* SamzaAppMaster uses this class to wire up all of Samza's application master
* listeners.
*/
-class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient) extends Logging {
+class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) extends Logging {
var isShutdown = false
- def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient) = this(1000, listeners, amClient)
+ def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) = this(1000, listeners, amClient)
def run {
try {
listeners.foreach(_.onInit)
while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _)) {
- val response = amClient.allocate(0).getAMResponse
+ val response = amClient.allocate(0)
- if (response.getReboot) {
- listeners.foreach(_.onReboot)
+ if (response.getAMCommand != null) {
+ response.getAMCommand match {
+ case AM_RESYNC | AM_SHUTDOWN =>
+ listeners.foreach(_.onReboot)
+ case _ =>
+ val msg = "Unhandled value of AMCommand: " + response.getAMCommand
+ error(msg);
+ throw new SamzaException(msg);
+ }
}
listeners.foreach(_.onEventLoop)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index bde38e1..3f03399 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -49,7 +49,6 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
new Path(config.getPackagePath.getOrElse(throw new SamzaException("No YARN package path defined in config."))),
config.getAMContainerMaxMemoryMb.getOrElse(512),
1,
- UserGroupInformation.getCurrentUser,
List(
"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s"
format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
index cbd7c1e..734d9d2 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
@@ -26,6 +26,7 @@ import org.apache.samza.config.Config
import scala.collection.JavaConversions._
import scala.collection.immutable.TreeMap
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
class ApplicationMasterWebServlet(config: Config, state: SamzaAppMasterState) extends ScalatraServlet with ScalateSupport {
val yarnConfig = new YarnConfiguration
@@ -38,6 +39,6 @@ class ApplicationMasterWebServlet(config: Config, state: SamzaAppMasterState) ex
layoutTemplate("/WEB-INF/views/index.scaml",
"config" -> TreeMap(config.toMap.toArray: _*),
"state" -> state,
- "rmHttpAddress" -> YarnConfiguration.getRMWebAppURL(yarnConfig))
+ "rmHttpAddress" -> WebAppUtils.getRMWebAppURLWithScheme(yarnConfig))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
index 8bf48eb..b24f85a 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
@@ -29,64 +29,53 @@ import org.apache.hadoop.yarn.api.records.ContainerId
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.api.records.Priority
import org.apache.hadoop.yarn.api.records.Resource
-import org.apache.hadoop.yarn.client.AMRMClient
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
import org.apache.hadoop.yarn.api.records.Resource
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
-import org.apache.hadoop.yarn.service._
+import org.apache.hadoop.service._
import org.apache.samza.SamzaException
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+import java.nio.ByteBuffer
class TestSamzaAppMasterLifecycle {
- val amClient = new AMRMClient {
+ val amClient = new AMRMClientImpl() {
var host = ""
var port = 0
var status: FinalApplicationStatus = null
- def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = {
+ override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = {
this.host = appHostName
this.port = appHostPort
new RegisterApplicationMasterResponse {
- def setApplicationACLs(map: java.util.Map[ApplicationAccessType, String]) = null
- def getApplicationACLs = null
- def setMaximumResourceCapability(r: Resource) = null
- def getMaximumResourceCapability = new Resource {
+ override def setApplicationACLs(map: java.util.Map[ApplicationAccessType, String]) = null
+ override def getApplicationACLs = null
+ override def setMaximumResourceCapability(r: Resource) = null
+ override def getMaximumResourceCapability = new Resource {
def getMemory = 512
def getVirtualCores = 2
def setMemory(memory: Int) {}
def setVirtualCores(vCores: Int) {}
def compareTo(o: Resource) = 0
}
- def setMinimumResourceCapability(r: Resource) = null
- def getMinimumResourceCapability = new Resource {
- def getMemory = 128
- def getVirtualCores = 1
- def setMemory(memory: Int) {}
- def setVirtualCores(vCores: Int) {}
- def compareTo(o: Resource) = 0
- }
+ override def getClientToAMTokenMasterKey = null
+ override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {}
}
}
- def allocate(progressIndicator: Float): AllocateResponse = null
- def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
+ override def allocate(progressIndicator: Float): AllocateResponse = null
+ override def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
appMessage: String,
appTrackingUrl: String) {
this.status = appStatus
}
- def addContainerRequest(req: ContainerRequest) {}
- def removeContainerRequest(req: ContainerRequest) {}
- def releaseAssignedContainer(containerId: ContainerId) {}
- def getClusterAvailableResources(): Resource = null
- def getClusterNodeCount() = 1
+ override def releaseAssignedContainer(containerId: ContainerId) {}
+ override def getClusterNodeCount() = 1
- def init(config: Configuration) {}
- def start() {}
- def stop() {}
- def register(listener: ServiceStateChangeListener) {}
- def unregister(listener: ServiceStateChangeListener) {}
- def getName(): String = ""
- def getConfig() = null
- def getServiceState() = null
- def getStartTime() = 0L
+ override def init(config: Configuration) {}
+ override def start() {}
+ override def stop() {}
+ override def getName(): String = ""
+ override def getConfig() = null
+ override def getStartTime() = 0L
}
@Test
@@ -125,9 +114,7 @@ class TestSamzaAppMasterLifecycle {
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
state.rpcPort = 1
List(new SamzaAppMasterLifecycle(768, 1, state, amClient, new YarnConfiguration),
- new SamzaAppMasterLifecycle(0, 1, state, amClient, new YarnConfiguration),
- new SamzaAppMasterLifecycle(368, 3, state, amClient, new YarnConfiguration),
- new SamzaAppMasterLifecycle(768, 0, state, amClient, new YarnConfiguration)).map(saml => {
+ new SamzaAppMasterLifecycle(368, 3, state, amClient, new YarnConfiguration)).map(saml => {
saml.onInit
assertTrue(saml.shouldShutdown)
})
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index c9f7029..ee3ffef 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -32,9 +32,10 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-import org.apache.hadoop.yarn.client.AMRMClient
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.service._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
+import org.apache.hadoop.service._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.records.NodeReport
import TestSamzaAppMasterTaskManager._
@@ -45,90 +46,89 @@ import org.apache.samza.SamzaException
object TestSamzaAppMasterTaskManager {
def getContainer(containerId: ContainerId) = new Container {
- def getId(): ContainerId = containerId
- def setId(id: ContainerId) {}
- def getNodeId(): NodeId = new NodeId {
+ override def getId(): ContainerId = containerId
+ override def setId(id: ContainerId) {}
+ override def getNodeId(): NodeId = new NodeId {
var host = ""
var port = 12345
- def getHost() = host
- def setHost(host: String) = {
+ override def getHost() = host
+ override def setHost(host: String) = {
this.host = host
}
- def getPort() = port
- def setPort(port: Int) = {
+ override def getPort() = port
+ override def setPort(port: Int) = {
this.port = port
}
+ override def build() = null
}
- def setNodeId(nodeId: NodeId) {}
- def getNodeHttpAddress(): String = ""
- def setNodeHttpAddress(nodeHttpAddress: String) {}
- def getResource(): Resource = null
- def setResource(resource: Resource) {}
- def getPriority(): Priority = null
- def setPriority(priority: Priority) {}
- def getState(): ContainerState = null
- def setState(state: ContainerState) {}
- def getContainerToken(): ContainerToken = null
- def setContainerToken(containerToken: ContainerToken) {}
- def getContainerStatus(): ContainerStatus = null
- def setContainerStatus(containerStatus: ContainerStatus) {}
- def compareTo(c: Container): Int = containerId.compareTo(c.getId)
+ override def setNodeId(nodeId: NodeId) {}
+ override def getNodeHttpAddress(): String = ""
+ override def setNodeHttpAddress(nodeHttpAddress: String) {}
+ override def getResource(): Resource = null
+ override def setResource(resource: Resource) {}
+ override def getPriority(): Priority = null
+ override def setPriority(priority: Priority) {}
+ override def getContainerToken(): Token = null
+ override def setContainerToken(containerToken: Token) {}
+ override def compareTo(c: Container): Int = containerId.compareTo(c.getId)
}
def getContainerStatus(containerId: ContainerId, exitCode: Int, diagnostic: String) = new ContainerStatus {
- def getContainerId(): ContainerId = containerId
- def setContainerId(containerId: ContainerId) {}
- def getState(): ContainerState = null
- def setState(state: ContainerState) {}
- def getExitStatus(): Int = exitCode
- def setExitStatus(exitStatus: Int) {}
- def getDiagnostics() = diagnostic
- def setDiagnostics(diagnostics: String) = {}
+ override def getContainerId(): ContainerId = containerId
+ override def setContainerId(containerId: ContainerId) {}
+ override def getState(): ContainerState = null
+ override def setState(state: ContainerState) {}
+ override def getExitStatus(): Int = exitCode
+ override def setExitStatus(exitStatus: Int) {}
+ override def getDiagnostics() = diagnostic
+ override def setDiagnostics(diagnostics: String) = {}
}
- def getAmClient = (response: AllocateResponse) => new AMRMClient {
+ def getAmClient = (response: AllocateResponse) => new AMRMClientImpl[ContainerRequest] {
var requests: List[ContainerRequest] = List[ContainerRequest]()
- var release: List[ContainerId] = List[ContainerId]()
-
- def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = null
- def allocate(progressIndicator: Float): AllocateResponse = response
- def unregisterApplicationMaster(appStatus: FinalApplicationStatus, appMessage: String, appTrackingUrl: String) = null
- def addContainerRequest(req: ContainerRequest) { requests ::= req }
- def removeContainerRequest(req: ContainerRequest) {}
- def releaseAssignedContainer(containerId: ContainerId) { release ::= containerId }
- def getClusterAvailableResources(): Resource = null
- def getClusterNodeCount() = 1
-
- def init(config: Configuration) {}
- def start() {}
- def stop() {}
- def register(listener: ServiceStateChangeListener) {}
- def unregister(listener: ServiceStateChangeListener) {}
- def getName(): String = ""
- def getConfig() = null
- def getServiceState() = null
- def getStartTime() = 0L
+
+ def getRelease = release
+ def resetRelease = release.clear
+ override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = null
+ override def allocate(progressIndicator: Float): AllocateResponse = response
+ override def unregisterApplicationMaster(appStatus: FinalApplicationStatus, appMessage: String, appTrackingUrl: String) = null
+ override def addContainerRequest(req: ContainerRequest) { requests ::= req }
+ override def removeContainerRequest(req: ContainerRequest) {}
+ override def getClusterNodeCount() = 1
+
+ override def init(config: Configuration) {}
+ override def start() {}
+ override def stop() {}
+ override def getName(): String = ""
+ override def getConfig() = null
+ override def getStartTime() = 0L
}
def getAppMasterResponse(reboot: Boolean, containers: List[Container], completed: List[ContainerStatus]) =
new AllocateResponse {
- def getAMResponse = new AMResponse {
- def getReboot(): Boolean = reboot
- def setReboot(reboot: Boolean) {}
- def getResponseId() = 0
- def setResponseId(responseId: Int) {}
- def getAllocatedContainers() = containers
- def setAllocatedContainers(containers: java.util.List[Container]) {}
- def getAvailableResources(): Resource = null
- def setAvailableResources(limit: Resource) {}
- def getCompletedContainersStatuses() = completed
- def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {}
- def setUpdatedNodes(nodes: java.util.List[NodeReport]) {}
- def getUpdatedNodes = null
+ override def getResponseId() = 0
+ override def setResponseId(responseId: Int) {}
+ override def getAllocatedContainers() = containers
+ override def setAllocatedContainers(containers: java.util.List[Container]) {}
+ override def getAvailableResources(): Resource = null
+ override def setAvailableResources(limit: Resource) {}
+ override def getCompletedContainersStatuses() = completed
+ override def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {}
+ override def setUpdatedNodes(nodes: java.util.List[NodeReport]) {}
+ override def getUpdatedNodes = null
+ override def getNumClusterNodes = 1
+ override def setNumClusterNodes(num: Int) {}
+ override def getNMTokens = null
+ override def setNMTokens(nmTokens: java.util.List[NMToken]) {}
+ override def setAMCommand(command: AMCommand) {}
+ override def getPreemptionMessage = null
+ override def setPreemptionMessage(request: PreemptionMessage) {}
+
+ override def getAMCommand = if (reboot) {
+ AMCommand.AM_RESYNC
+ } else {
+ null
}
- def getNumClusterNodes = 1
- def setNumClusterNodes(num: Int) {}
- def setAMResponse(response: AMResponse) {}
}
}
@@ -169,7 +169,7 @@ class TestSamzaAppMasterTaskManager {
val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
- override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
+ override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
// Do nothing.
}
}
@@ -182,13 +182,13 @@ class TestSamzaAppMasterTaskManager {
assert(taskManager.shouldShutdown == false)
// 2. First is from onInit, second is from onContainerCompleted, since it failed.
assertEquals(2, amClient.requests.size)
- assertEquals(0, amClient.release.size)
+ assertEquals(0, amClient.getRelease.size)
assertFalse(taskManager.shouldShutdown)
// Now trigger an AM shutdown since our retry count is 1, and we're failing twice
taskManager.onContainerAllocated(getContainer(container2))
taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here"))
assertEquals(2, amClient.requests.size)
- assertEquals(0, amClient.release.size)
+ assertEquals(0, amClient.getRelease.size)
assertTrue(taskManager.shouldShutdown)
}
@@ -200,7 +200,7 @@ class TestSamzaAppMasterTaskManager {
var containersRequested = 0
var containersStarted = 0
val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
- override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
+ override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
containersStarted += 1
}
@@ -216,7 +216,7 @@ class TestSamzaAppMasterTaskManager {
taskManager.onInit
assert(taskManager.shouldShutdown == false)
assert(amClient.requests.size == 1)
- assert(amClient.release.size == 0)
+ assert(amClient.getRelease.size == 0)
// allocate container 2
taskManager.onContainerAllocated(getContainer(container2))
@@ -234,12 +234,12 @@ class TestSamzaAppMasterTaskManager {
assert(state.taskPartitions.size == 1)
assert(state.unclaimedTasks.size == 0)
assert(amClient.requests.size == 1)
- assert(amClient.release.size == 1)
- assert(amClient.release(0).equals(container3))
+ assert(amClient.getRelease.size == 1)
+ assert(amClient.getRelease.head.equals(container3))
// reset the helper state, so we can make sure that releasing the container (next step) doesn't request more resources
amClient.requests = List()
- amClient.release = List()
+ amClient.resetRelease
// now release the container, and make sure the AM doesn't ask for more
assert(taskManager.shouldShutdown == false)
@@ -250,14 +250,14 @@ class TestSamzaAppMasterTaskManager {
assert(state.taskPartitions.size == 1)
assert(state.unclaimedTasks.size == 0)
assert(amClient.requests.size == 0)
- assert(amClient.release.size == 0)
+ assert(amClient.getRelease.size == 0)
// pretend container 2 is released due to an NM failure, and make sure that the AM requests a new container
assert(taskManager.shouldShutdown == false)
taskManager.onContainerCompleted(getContainerStatus(container2, -100, "pretend the container was 'lost' due to an NM failure"))
assert(taskManager.shouldShutdown == false)
assert(amClient.requests.size == 1)
- assert(amClient.release.size == 0)
+ assert(amClient.getRelease.size == 0)
}
@Test
@@ -270,7 +270,7 @@ class TestSamzaAppMasterTaskManager {
state.taskCount = 2
var containersStarted = 0
val taskManager = new SamzaAppMasterTaskManager(clock, newConfig, state, amClient, new YarnConfiguration) {
- override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
+ override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
containersStarted += 1
}
}
@@ -280,8 +280,8 @@ class TestSamzaAppMasterTaskManager {
assert(taskManager.shouldShutdown == false)
taskManager.onInit
assert(taskManager.shouldShutdown == false)
- assert(amClient.requests.size == 1)
- assert(amClient.release.size == 0)
+ assert(amClient.requests.size == 2)
+ assert(amClient.getRelease.size == 0)
taskManager.onContainerAllocated(getContainer(container2))
assert(state.neededContainers == 1)
assert(state.runningTasks.size == 1)
@@ -337,7 +337,7 @@ class TestSamzaAppMasterTaskManager {
var containersRequested = 0
var containersStarted = 0
val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
- override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
+ override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
containersStarted += 1
}
@@ -353,7 +353,7 @@ class TestSamzaAppMasterTaskManager {
taskManager.onInit
assert(taskManager.shouldShutdown == false)
assert(amClient.requests.size == 1)
- assert(amClient.release.size == 0)
+ assert(amClient.getRelease.size == 0)
assert(state.neededContainers == 1)
assert(state.runningTasks.size == 0)
assert(state.taskPartitions.size == 0)
@@ -373,8 +373,8 @@ class TestSamzaAppMasterTaskManager {
assert(containersRequested == 1)
assert(containersStarted == 1)
assert(amClient.requests.size == 1)
- assert(amClient.release.size == 1)
- assert(amClient.release(0).equals(container3))
+ assert(amClient.getRelease.size == 1)
+ assert(amClient.getRelease.head.equals(container3))
}
@Test
@@ -429,6 +429,6 @@ class MockSystemFactory extends SystemFactory {
class MockSinglePartitionManager extends SystemAdmin {
def getPartitions(streamName: String) = Set(new Partition(0))
-
+
def getLastOffsets(streams: java.util.Set[String]) = throw new SamzaException("Need to implement this")
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
index 0040648..98f7844 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.api.records.Container
import org.apache.hadoop.yarn.api.records.ContainerStatus
import org.apache.hadoop.yarn.api.records.ResourceRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.AMResponse
import org.apache.hadoop.yarn.api.records.ContainerId
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse