You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/04/25 01:48:38 UTC
samza git commit: SAMZA-433: Rename "task" to "container" in AM and
container
Repository: samza
Updated Branches:
refs/heads/master 88a844b06 -> c37d75270
SAMZA-433: Rename "task" to "container" in AM and container
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c37d7527
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c37d7527
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c37d7527
Branch: refs/heads/master
Commit: c37d75270a44ecae4079a8ccbb53022bdf56c8cb
Parents: 88a844b
Author: Benjamin Fradet <be...@gmail.com>
Authored: Fri Apr 24 16:35:18 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Fri Apr 24 16:35:18 2015 -0700
----------------------------------------------------------------------
.../resources/scalate/WEB-INF/views/index.scaml | 16 +--
.../org/apache/samza/config/YarnConfig.scala | 4 +-
.../samza/job/yarn/SamzaAppMasterMetrics.scala | 6 +-
.../samza/job/yarn/SamzaAppMasterState.scala | 10 +-
.../job/yarn/SamzaAppMasterTaskManager.scala | 130 +++++++++----------
.../webapp/ApplicationMasterRestServlet.scala | 2 +-
.../yarn/TestSamzaAppMasterTaskManager.scala | 72 +++++-----
7 files changed, 120 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index 2b1aa3e..a874b0e 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -72,7 +72,7 @@
%tr
%tr
%td.key Completed
- %td= state.completedTasks.toString
+ %td= state.completedContainers.toString
%tr
%td.key Needed
%td= state.neededContainers.toString
@@ -93,9 +93,9 @@
%th Start Time
%th Up Time
%tbody
- - for((taskId, container) <- state.runningTasks)
+ - for((containerId, container) <- state.runningContainers)
%tr
- %td #{taskId.toString}
+ %td #{containerId.toString}
%td
%a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString
%td
@@ -111,24 +111,24 @@
%tbody
%tr
%td.key Total
- %td= state.taskCount.toString
+ %td= state.containerCount.toString
%tr
%td.key Unclaimed
- %td= state.unclaimedTasks.size.toString
+ %td= state.unclaimedContainers.size.toString
%tr
%td.key Finished
- %td= state.finishedTasks.size.toString
+ %td= state.finishedContainers.size.toString
%h3 TaskName Assignment
%table.table.table-striped.table-bordered.tablesorter#taskids-table
%thead
%tr
- %th Task ID
+ %th Task Group ID
%th TaskName
%th SystemStreamPartitions
%th Container
%tbody
- - for((containerId, container) <- state.runningTasks)
+ - for((containerId, container) <- state.runningContainers)
- val containerModel = state.jobCoordinator.jobModel.getContainers.get(containerId)
- for((taskName, taskModel) <- containerModel.getTasks)
%tr
http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
index 03395e2..5da1c35 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -26,7 +26,7 @@ object YarnConfig {
val CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"
val CONTAINER_RETRY_COUNT = "yarn.container.retry.count"
val CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"
- val TASK_COUNT = "yarn.container.count"
+ val CONTAINER_COUNT = "yarn.container.count"
val AM_JVM_OPTIONS = "yarn.am.opts"
val AM_JMX_ENABLED = "yarn.am.jmx.enabled"
val AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb"
@@ -47,7 +47,7 @@ class YarnConfig(config: Config) extends ScalaMapConfig(config) {
def getPackagePath = getOption(YarnConfig.PACKAGE_PATH)
- def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT).map(_.toInt)
+ def getContainerCount: Option[Int] = getOption(YarnConfig.CONTAINER_COUNT).map(_.toInt)
def getAmOpts = getOption(YarnConfig.AM_JVM_OPTIONS)
http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index ee2aa32..03acfe1 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -65,12 +65,12 @@ class SamzaAppMasterMetrics(
}).toMap
override def onInit() {
- val mRunningContainers = newGauge("running-containers", () => state.runningTasks.size)
+ val mRunningContainers = newGauge("running-containers", () => state.runningContainers.size)
val mNeededContainers = newGauge("needed-containers", () => state.neededContainers)
- val mCompletedContainers = newGauge("completed-containers", () => state.completedTasks)
+ val mCompletedContainers = newGauge("completed-containers", () => state.completedContainers)
val mFailedContainers = newGauge("failed-containers", () => state.failedContainers)
val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers)
- val mTasks = newGauge("task-count", () => state.taskCount)
+ val mContainers = newGauge("container-count", () => state.containerCount)
val mHost = newGauge("http-host", () => state.nodeHost)
val mTrackingPort = newGauge("http-port", () => state.trackingUrl.getPort)
val mRpcPort = newGauge("rpc-port", () => state.rpcUrl.getPort)
http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index 8ba435e..b1e5546 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -35,14 +35,14 @@ import org.apache.samza.coordinator.JobCoordinator
*/
class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nodeHost: String, val nodePort: Int, val nodeHttpPort: Int) extends YarnAppMasterListener with Logging {
// controlled by the AM
- var completedTasks = 0
+ var completedContainers = 0
var neededContainers = 0
var failedContainers = 0
var releasedContainers = 0
- var taskCount = 0
- var unclaimedTasks = Set[Int]()
- var finishedTasks = Set[Int]()
- var runningTasks = Map[Int, YarnContainer]()
+ var containerCount = 0
+ var runningContainers = Map[Int, YarnContainer]()
+ var unclaimedContainers = Set[Int]()
+ var finishedContainers = Set[Int]()
var jobCoordinator: JobCoordinator = null
var status = FinalApplicationStatus.UNDEFINED
var jobHealthy = true
http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index b0b6543..38e1f5f 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -52,7 +52,7 @@ object SamzaAppMasterTaskManager {
val DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000
}
-case class TaskFailure(val count: Int, val lastFailure: Long)
+case class ContainerFailure(val count: Int, val lastFailure: Long)
/**
* Samza's application master is mostly interested in requesting containers to
@@ -63,29 +63,29 @@ case class TaskFailure(val count: Int, val lastFailure: Long)
class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClientAsync[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
import SamzaAppMasterTaskManager._
- state.taskCount = config
- .getTaskCount
+ state.containerCount = config
+ .getContainerCount
.getOrElse({
- info("No %s specified. Defaulting to one container." format YarnConfig.TASK_COUNT)
+ info("No %s specified. Defaulting to one container." format YarnConfig.CONTAINER_COUNT)
1
})
- state.jobCoordinator = JobCoordinator(config, state.taskCount)
+ state.jobCoordinator = JobCoordinator(config, state.containerCount)
- var taskFailures = Map[Int, TaskFailure]()
+ var containerFailures = Map[Int, ContainerFailure]()
var tooManyFailedContainers = false
// TODO we might want to use NMClientAsync as well
var containerManager: NMClient = null
- override def shouldShutdown = state.completedTasks == state.taskCount || tooManyFailedContainers
+ override def shouldShutdown = state.completedContainers == state.containerCount || tooManyFailedContainers
override def onInit() {
- state.neededContainers = state.taskCount
- state.unclaimedTasks = state.jobCoordinator.jobModel.getContainers.keySet.map(_.toInt).toSet
+ state.neededContainers = state.containerCount
+ state.unclaimedContainers = state.jobCoordinator.jobModel.getContainers.keySet.map(_.toInt).toSet
containerManager = NMClient.createNMClient()
containerManager.init(conf)
containerManager.start
- info("Requesting %s containers" format state.taskCount)
+ info("Requesting %s containers" format state.containerCount)
requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), state.neededContainers)
}
@@ -101,22 +101,22 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
info("Got a container from YARN ResourceManager: %s" format container)
- state.unclaimedTasks.headOption match {
- case Some(taskId) => {
- info("Got available task id (%d) for container: %s" format (taskId, container))
- val sspTaskNames = state.jobCoordinator.jobModel.getContainers.get(taskId)
- info("Claimed SSP taskNames %s for container ID %s" format (sspTaskNames, taskId))
+ state.unclaimedContainers.headOption match {
+ case Some(containerId) => {
+ info("Got available container ID (%d) for container: %s" format (containerId, container))
+ val sspTaskNames = state.jobCoordinator.jobModel.getContainers.get(containerId)
+ info("Claimed SSP taskNames %s for container ID %s" format (sspTaskNames, containerId))
val cmdBuilderClassName = config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName)
val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
.setConfig(config)
- .setId(taskId)
+ .setId(containerId)
.setUrl(state.coordinatorUrl)
val command = cmdBuilder.buildCommand
- info("Task ID %s using command %s" format (taskId, command))
+ info("Container ID %s using command %s" format (containerId, command))
val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) }
- info("Task ID %s using env %s" format (taskId, env))
+ info("Container ID %s using env %s" format (containerId, env))
val path = new Path(config.getPackagePath.get)
- info("Starting task ID %s using package path %s" format (taskId, path))
+ info("Starting container ID %s using package path %s" format (containerId, path))
startContainer(
path,
@@ -128,12 +128,12 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
if (state.neededContainers == 0) {
state.jobHealthy = true
}
- state.runningTasks += taskId -> new YarnContainer(container)
- state.unclaimedTasks -= taskId
+ state.runningContainers += containerId -> new YarnContainer(container)
+ state.unclaimedContainers -= containerId
- info("Claimed task ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr))
+ info("Claimed container ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (containerId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr))
- info("Started task ID %s" format taskId)
+ info("Started container ID %s" format containerId)
}
case _ => {
// there are no more tasks to run, so release the container
@@ -146,11 +146,11 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
override def onContainerCompleted(containerStatus: ContainerStatus) {
val containerIdStr = ConverterUtils.toString(containerStatus.getContainerId)
- val taskId = state.runningTasks.filter { case (_, container) => container.id.equals(containerStatus.getContainerId()) }.keys.headOption
+ val containerId = state.runningContainers.filter { case (_, container) => container.id.equals(containerStatus.getContainerId()) }.keys.headOption
- taskId match {
- case Some(taskId) => {
- state.runningTasks -= taskId
+ containerId match {
+ case Some(containerId) => {
+ state.runningContainers -= containerId
}
case _ => None
}
@@ -159,15 +159,15 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
case 0 => {
info("Container %s completed successfully." format containerIdStr)
- state.completedTasks += 1
+ state.completedContainers += 1
- if (taskId.isDefined) {
- state.finishedTasks += taskId.get
- taskFailures -= taskId.get
+ if (containerId.isDefined) {
+ state.finishedContainers += containerId.get
+ containerFailures -= containerId.get
}
- if (state.completedTasks == state.taskCount) {
- info("Setting job status to SUCCEEDED, since all tasks have been marked as completed.")
+ if (state.completedContainers == state.containerCount) {
+ info("Setting job status to SUCCEEDED, since all containers have been marked as completed.")
state.status = FinalApplicationStatus.SUCCEEDED
}
}
@@ -178,16 +178,16 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
state.releasedContainers += 1
- // If this container was assigned some partitions (a containerId), then
- // clean up, and request a new container for the tasks. This only
- // should happen if the container was 'lost' due to node failure, not
+ // If this container was assigned some partitions (a containerId), then
+ // clean up, and request a new container for the tasks. This only
+ // should happen if the container was 'lost' due to node failure, not
// if the AM released the container.
- if (taskId.isDefined) {
- info("Released container %s was assigned task ID %s. Requesting a new container for the task." format (containerIdStr, taskId.get))
+ if (containerId.isDefined) {
+ info("Released container %s was assigned task group ID %s. Requesting a new container for the task group." format (containerIdStr, containerId.get))
state.neededContainers += 1
state.jobHealthy = false
- state.unclaimedTasks += taskId.get
+ state.unclaimedContainers += containerId.get
// request a new container
requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), 1)
@@ -199,33 +199,33 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
state.failedContainers += 1
state.jobHealthy = false
- taskId match {
- case Some(taskId) =>
- info("Failed container %s owned task id %s." format (containerIdStr, taskId))
+ containerId match {
+ case Some(containerId) =>
+ info("Failed container %s owned task group ID %s." format (containerIdStr, containerId))
- state.unclaimedTasks += taskId
+ state.unclaimedContainers += containerId
state.neededContainers += 1
- // A container failed for an unknown reason. Let's check to see if
- // we need to shutdown the whole app master if too many container
- // failures have happened. The rules for failing are that the failure
- // count for a task id must be > the configured retry count, and the
- // last failure (the one prior to this one) must have happened less
- // than retry window ms ago. If retry count is set to 0, the app
- // master will fail on any container failure. If the retry count is
- // set to a number < 0, a container failure will never trigger an
- // app master failure.
+ // A container failed for an unknown reason. Let's check to see if
+ // we need to shutdown the whole app master if too many container
+ // failures have happened. The rules for failing are that the
+ // failure count for a task group id must be > the configured retry
+ // count, and the last failure (the one prior to this one) must have
+ // happened less than retry window ms ago. If retry count is set to
+ // 0, the app master will fail on any container failure. If the
+ // retry count is set to a number < 0, a container failure will
+ // never trigger an app master failure.
val retryCount = config.getContainerRetryCount.getOrElse(DEFAULT_CONTAINER_RETRY_COUNT)
val retryWindowMs = config.getContainerRetryWindowMs.getOrElse(DEFAULT_CONTAINER_RETRY_WINDOW_MS)
if (retryCount == 0) {
- error("Task id %s (%s) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed."
- format (taskId, containerIdStr))
+ error("Container ID %s (%s) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed."
+ format (containerId, containerIdStr))
tooManyFailedContainers = true
} else if (retryCount > 0) {
- val (currentFailCount, lastFailureTime) = taskFailures.get(taskId) match {
- case Some(TaskFailure(count, lastFailure)) => (count + 1, lastFailure)
+ val (currentFailCount, lastFailureTime) = containerFailures.get(containerId) match {
+ case Some(ContainerFailure(count, lastFailure)) => (count + 1, lastFailure)
case _ => (1, 0L)
}
@@ -233,24 +233,24 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
val lastFailureMsDiff = clock() - lastFailureTime
if (lastFailureMsDiff < retryWindowMs) {
- error("Task id %s (%s) has failed %s times, with last failure %sms ago. This is greater than retry count of %s and window of %s, so shutting down the application master, and marking the job as failed."
- format (taskId, containerIdStr, currentFailCount, lastFailureMsDiff, retryCount, retryWindowMs))
+ error("Container ID %s (%s) has failed %s times, with last failure %sms ago. This is greater than retry count of %s and window of %s, so shutting down the application master, and marking the job as failed."
+ format (containerId, containerIdStr, currentFailCount, lastFailureMsDiff, retryCount, retryWindowMs))
- // We have too many failures, and we're within the window
+ // We have too many failures, and we're within the window
// boundary, so reset shut down the app master.
tooManyFailedContainers = true
state.status = FinalApplicationStatus.FAILED
} else {
- info("Resetting fail count for task id %s back to 1, since last container failure (%s) for this task id was outside the bounds of the retry window."
- format (taskId, containerIdStr))
+ info("Resetting fail count for container ID %s back to 1, since last container failure (%s) for this container ID was outside the bounds of the retry window."
+ format (containerId, containerIdStr))
- // Reset counter back to 1, since the last failure for this
+ // Reset counter back to 1, since the last failure for this
// container happened outside the window boundary.
- taskFailures += taskId -> TaskFailure(1, clock())
+ containerFailures += containerId -> ContainerFailure(1, clock())
}
} else {
- info("Current fail count for task id %s is %s." format (taskId, currentFailCount))
- taskFailures += taskId -> TaskFailure(currentFailCount, clock())
+ info("Current fail count for container ID %s is %s." format (containerId, currentFailCount))
+ containerFailures += containerId -> ContainerFailure(currentFailCount, clock())
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index 4c855bf..09f4dc3 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -80,7 +80,7 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, r
get("/am") {
val containers = new HashMap[String, HashMap[String, Object]]
- state.runningTasks.foreach {
+ state.runningContainers.foreach {
case (containerId, container) =>
val yarnContainerId = container.id.toString
val containerMap = new HashMap[String, Object]
http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 58f2464..ee2d0ea 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -152,10 +152,10 @@ class TestSamzaAppMasterTaskManager {
"yarn.container.retry.window.ms" -> "1999999999"))
@Test
- def testAppMasterShouldDefaultToOneContainerIfTaskCountIsNotSpecified {
+ def testAppMasterShouldDefaultToOneContainerIfContainerCountIsNotSpecified {
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
val taskManager = new SamzaAppMasterTaskManager(clock, config, state, null, new YarnConfiguration)
- assertEquals(1, state.taskCount)
+ assertEquals(1, state.containerCount)
}
@Test
@@ -166,8 +166,8 @@ class TestSamzaAppMasterTaskManager {
assertFalse(taskManager.shouldShutdown)
taskManager.onContainerCompleted(getContainerStatus(state.containerId, 0, ""))
assertTrue(taskManager.shouldShutdown)
- assertEquals(1, state.completedTasks)
- assertEquals(1, state.taskCount)
+ assertEquals(1, state.completedContainers)
+ assertEquals(1, state.containerCount)
assertTrue(state.jobHealthy)
assertEquals(FinalApplicationStatus.SUCCEEDED, state.status)
}
@@ -212,7 +212,7 @@ class TestSamzaAppMasterTaskManager {
val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
state.coordinatorUrl = new URL("http://localhost:1234")
- state.taskCount = 2
+ state.containerCount = 2
var containersRequested = 0
var containersStarted = 0
val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
@@ -237,16 +237,16 @@ class TestSamzaAppMasterTaskManager {
// allocate container 2
taskManager.onContainerAllocated(getContainer(container2))
assertEquals(0, state.neededContainers)
- assertEquals(1, state.runningTasks.size)
- assertEquals(0, state.unclaimedTasks.size)
+ assertEquals(1, state.runningContainers.size)
+ assertEquals(0, state.unclaimedContainers.size)
assertEquals(1, containersRequested)
assertEquals(1, containersStarted)
// allocate an extra container, which the AM doesn't need, and should be released
taskManager.onContainerAllocated(getContainer(container3))
assertEquals(0, state.neededContainers)
- assertEquals(1, state.runningTasks.size)
- assertEquals(0, state.unclaimedTasks.size)
+ assertEquals(1, state.runningContainers.size)
+ assertEquals(0, state.unclaimedContainers.size)
assertEquals(1, amClient.getClient.requests.size)
assertEquals(1, amClient.getClient.getRelease.size)
assertEquals(container3, amClient.getClient.getRelease.head)
@@ -260,8 +260,8 @@ class TestSamzaAppMasterTaskManager {
taskManager.onContainerCompleted(getContainerStatus(container3, -100, "pretend the container was released"))
assertFalse(taskManager.shouldShutdown)
assertEquals(0, state.neededContainers)
- assertEquals(1, state.runningTasks.size)
- assertEquals(0, state.unclaimedTasks.size)
+ assertEquals(1, state.runningContainers.size)
+ assertEquals(0, state.unclaimedContainers.size)
assertEquals(0, amClient.getClient.requests.size)
assertEquals(0, amClient.getClient.getRelease.size)
@@ -276,8 +276,8 @@ class TestSamzaAppMasterTaskManager {
taskManager.onContainerAllocated(getContainer(container2))
assertEquals(0, state.neededContainers)
assertTrue(state.jobHealthy)
- assertEquals(1, state.runningTasks.size)
- assertEquals(0, state.unclaimedTasks.size)
+ assertEquals(1, state.runningContainers.size)
+ assertEquals(0, state.unclaimedContainers.size)
}
@Test
@@ -287,7 +287,7 @@ class TestSamzaAppMasterTaskManager {
val newConfig = new MapConfig(map)
val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
- state.taskCount = 2
+ state.containerCount = 2
state.coordinatorUrl = new URL("http://localhost:1234")
var containersStarted = 0
val taskManager = new SamzaAppMasterTaskManager(clock, newConfig, state, amClient, new YarnConfiguration) {
@@ -305,43 +305,43 @@ class TestSamzaAppMasterTaskManager {
assertEquals(0, amClient.getClient.getRelease.size)
taskManager.onContainerAllocated(getContainer(container2))
assertEquals(1, state.neededContainers)
- assertEquals(1, state.runningTasks.size)
- assertEquals(1, state.unclaimedTasks.size)
+ assertEquals(1, state.runningContainers.size)
+ assertEquals(1, state.unclaimedContainers.size)
assertEquals(1, containersStarted)
taskManager.onContainerAllocated(getContainer(container3))
assertEquals(0, state.neededContainers)
- assertEquals(2, state.runningTasks.size)
- assertEquals(0, state.unclaimedTasks.size)
+ assertEquals(2, state.runningContainers.size)
+ assertEquals(0, state.unclaimedContainers.size)
assertEquals(2, containersStarted)
// container2 finishes successfully
taskManager.onContainerCompleted(getContainerStatus(container2, 0, ""))
assertEquals(0, state.neededContainers)
- assertEquals(1, state.runningTasks.size)
- assertEquals(0, state.unclaimedTasks.size)
- assertEquals(1, state.completedTasks)
+ assertEquals(1, state.runningContainers.size)
+ assertEquals(0, state.unclaimedContainers.size)
+ assertEquals(1, state.completedContainers)
// container3 fails
taskManager.onContainerCompleted(getContainerStatus(container3, 1, "expected failure here"))
assertEquals(1, state.neededContainers)
- assertEquals(0, state.runningTasks.size)
- assertEquals(1, state.unclaimedTasks.size)
- assertEquals(1, state.completedTasks)
+ assertEquals(0, state.runningContainers.size)
+ assertEquals(1, state.unclaimedContainers.size)
+ assertEquals(1, state.completedContainers)
assertFalse(taskManager.shouldShutdown)
// container3 is re-allocated
taskManager.onContainerAllocated(getContainer(container3))
assertEquals(0, state.neededContainers)
- assertEquals(1, state.runningTasks.size)
- assertEquals(0, state.unclaimedTasks.size)
+ assertEquals(1, state.runningContainers.size)
+ assertEquals(0, state.unclaimedContainers.size)
assertEquals(3, containersStarted)
// container3 finishes sucecssfully
taskManager.onContainerCompleted(getContainerStatus(container3, 0, ""))
assertEquals(0, state.neededContainers)
- assertEquals(0, state.runningTasks.size)
- assertEquals(0, state.unclaimedTasks.size)
- assertEquals(2, state.completedTasks)
+ assertEquals(0, state.runningContainers.size)
+ assertEquals(0, state.unclaimedContainers.size)
+ assertEquals(2, state.completedContainers)
assertTrue(taskManager.shouldShutdown)
}
@@ -371,18 +371,18 @@ class TestSamzaAppMasterTaskManager {
assertEquals(1, amClient.getClient.requests.size)
assertEquals(0, amClient.getClient.getRelease.size)
assertEquals(1, state.neededContainers)
- assertEquals(0, state.runningTasks.size)
- assertEquals(1, state.unclaimedTasks.size)
+ assertEquals(0, state.runningContainers.size)
+ assertEquals(1, state.unclaimedContainers.size)
taskManager.onContainerAllocated(getContainer(container2))
assertEquals(0, state.neededContainers)
- assertEquals(1, state.runningTasks.size)
- assertEquals(0, state.unclaimedTasks.size)
+ assertEquals(1, state.runningContainers.size)
+ assertEquals(0, state.unclaimedContainers.size)
assertEquals(1, containersRequested)
assertEquals(1, containersStarted)
taskManager.onContainerAllocated(getContainer(container3))
assertEquals(0, state.neededContainers)
- assertEquals(1, state.runningTasks.size)
- assertEquals(0, state.unclaimedTasks.size)
+ assertEquals(1, state.runningContainers.size)
+ assertEquals(0, state.unclaimedContainers.size)
assertEquals(1, containersRequested)
assertEquals(1, containersStarted)
assertEquals(1, amClient.getClient.requests.size)
@@ -403,7 +403,7 @@ class MockSystemFactory extends SystemFactory {
}
def getAdmin(systemName: String, config: Config) = {
- val containerCount = config.getTaskCount.getOrElse(1)
+ val containerCount = config.getContainerCount.getOrElse(1)
new MockSystemAdmin(containerCount)
}
}