You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/04/25 01:48:38 UTC

samza git commit: SAMZA-433: Rename "task" to "container" in AM and container

Repository: samza
Updated Branches:
  refs/heads/master 88a844b06 -> c37d75270


SAMZA-433: Rename "task" to "container" in AM and container


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c37d7527
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c37d7527
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c37d7527

Branch: refs/heads/master
Commit: c37d75270a44ecae4079a8ccbb53022bdf56c8cb
Parents: 88a844b
Author: Benjamin Fradet <be...@gmail.com>
Authored: Fri Apr 24 16:35:18 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Fri Apr 24 16:35:18 2015 -0700

----------------------------------------------------------------------
 .../resources/scalate/WEB-INF/views/index.scaml |  16 +--
 .../org/apache/samza/config/YarnConfig.scala    |   4 +-
 .../samza/job/yarn/SamzaAppMasterMetrics.scala  |   6 +-
 .../samza/job/yarn/SamzaAppMasterState.scala    |  10 +-
 .../job/yarn/SamzaAppMasterTaskManager.scala    | 130 +++++++++----------
 .../webapp/ApplicationMasterRestServlet.scala   |   2 +-
 .../yarn/TestSamzaAppMasterTaskManager.scala    |  72 +++++-----
 7 files changed, 120 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index 2b1aa3e..a874b0e 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -72,7 +72,7 @@
         %tr
           %tr
             %td.key Completed
-            %td= state.completedTasks.toString
+            %td= state.completedContainers.toString
           %tr
             %td.key Needed
             %td= state.neededContainers.toString
@@ -93,9 +93,9 @@
             %th Start Time
             %th Up Time
         %tbody
-          - for((taskId, container) <- state.runningTasks)
+          - for((containerId, container) <- state.runningContainers)
             %tr
-              %td #{taskId.toString}
+              %td #{containerId.toString}
               %td
                 %a(target="_blank" href="http://#{container.nodeHttpAddress}/node/containerlogs/#{container.id.toString}/#{username}")= container.id.toString
               %td
@@ -111,24 +111,24 @@
         %tbody
           %tr
             %td.key Total
-            %td= state.taskCount.toString
+            %td= state.containerCount.toString
           %tr
             %td.key Unclaimed
-            %td= state.unclaimedTasks.size.toString
+            %td= state.unclaimedContainers.size.toString
           %tr
             %td.key Finished
-            %td= state.finishedTasks.size.toString
+            %td= state.finishedContainers.size.toString
 
       %h3 TaskName Assignment
       %table.table.table-striped.table-bordered.tablesorter#taskids-table
         %thead
           %tr
-            %th Task ID
+            %th Task Group ID
             %th TaskName
             %th SystemStreamPartitions
             %th Container
         %tbody
-          - for((containerId, container) <- state.runningTasks)
+          - for((containerId, container) <- state.runningContainers)
             - val containerModel = state.jobCoordinator.jobModel.getContainers.get(containerId)
             - for((taskName, taskModel) <- containerModel.getTasks)
               %tr

http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
index 03395e2..5da1c35 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -26,7 +26,7 @@ object YarnConfig {
   val CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"
   val CONTAINER_RETRY_COUNT = "yarn.container.retry.count"
   val CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"
-  val TASK_COUNT = "yarn.container.count"
+  val CONTAINER_COUNT = "yarn.container.count"
   val AM_JVM_OPTIONS = "yarn.am.opts"
   val AM_JMX_ENABLED = "yarn.am.jmx.enabled"
   val AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb"
@@ -47,7 +47,7 @@ class YarnConfig(config: Config) extends ScalaMapConfig(config) {
 
   def getPackagePath = getOption(YarnConfig.PACKAGE_PATH)
 
-  def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT).map(_.toInt)
+  def getContainerCount: Option[Int] = getOption(YarnConfig.CONTAINER_COUNT).map(_.toInt)
 
   def getAmOpts = getOption(YarnConfig.AM_JVM_OPTIONS)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index ee2aa32..03acfe1 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -65,12 +65,12 @@ class SamzaAppMasterMetrics(
   }).toMap
 
   override def onInit() {
-    val mRunningContainers = newGauge("running-containers", () => state.runningTasks.size)
+    val mRunningContainers = newGauge("running-containers", () => state.runningContainers.size)
     val mNeededContainers = newGauge("needed-containers", () => state.neededContainers)
-    val mCompletedContainers = newGauge("completed-containers", () => state.completedTasks)
+    val mCompletedContainers = newGauge("completed-containers", () => state.completedContainers)
     val mFailedContainers = newGauge("failed-containers", () => state.failedContainers)
     val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers)
-    val mTasks = newGauge("task-count", () => state.taskCount)
+    val mContainers = newGauge("container-count", () => state.containerCount)
     val mHost = newGauge("http-host", () => state.nodeHost)
     val mTrackingPort = newGauge("http-port", () => state.trackingUrl.getPort)
     val mRpcPort = newGauge("rpc-port", () => state.rpcUrl.getPort)

http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index 8ba435e..b1e5546 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -35,14 +35,14 @@ import org.apache.samza.coordinator.JobCoordinator
  */
 class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nodeHost: String, val nodePort: Int, val nodeHttpPort: Int) extends YarnAppMasterListener with Logging {
   // controlled by the AM
-  var completedTasks = 0
+  var completedContainers = 0
   var neededContainers = 0
   var failedContainers = 0
   var releasedContainers = 0
-  var taskCount = 0
-  var unclaimedTasks = Set[Int]()
-  var finishedTasks = Set[Int]()
-  var runningTasks = Map[Int, YarnContainer]()
+  var containerCount = 0
+  var runningContainers = Map[Int, YarnContainer]()
+  var unclaimedContainers = Set[Int]()
+  var finishedContainers = Set[Int]()
   var jobCoordinator: JobCoordinator = null
   var status = FinalApplicationStatus.UNDEFINED
   var jobHealthy = true

http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index b0b6543..38e1f5f 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -52,7 +52,7 @@ object SamzaAppMasterTaskManager {
   val DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000
 }
 
-case class TaskFailure(val count: Int, val lastFailure: Long)
+case class ContainerFailure(val count: Int, val lastFailure: Long)
 
 /**
  * Samza's application master is mostly interested in requesting containers to
@@ -63,29 +63,29 @@ case class TaskFailure(val count: Int, val lastFailure: Long)
 class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClientAsync[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
   import SamzaAppMasterTaskManager._
 
-  state.taskCount = config
-    .getTaskCount
+  state.containerCount = config
+    .getContainerCount
     .getOrElse({
-      info("No %s specified. Defaulting to one container." format YarnConfig.TASK_COUNT)
+      info("No %s specified. Defaulting to one container." format YarnConfig.CONTAINER_COUNT)
       1
     })
-  state.jobCoordinator = JobCoordinator(config, state.taskCount)
+  state.jobCoordinator = JobCoordinator(config, state.containerCount)
 
-  var taskFailures = Map[Int, TaskFailure]()
+  var containerFailures = Map[Int, ContainerFailure]()
   var tooManyFailedContainers = false
   // TODO we might want to use NMClientAsync as well
   var containerManager: NMClient = null
 
-  override def shouldShutdown = state.completedTasks == state.taskCount || tooManyFailedContainers
+  override def shouldShutdown = state.completedContainers == state.containerCount || tooManyFailedContainers
 
   override def onInit() {
-    state.neededContainers = state.taskCount
-    state.unclaimedTasks = state.jobCoordinator.jobModel.getContainers.keySet.map(_.toInt).toSet
+    state.neededContainers = state.containerCount
+    state.unclaimedContainers = state.jobCoordinator.jobModel.getContainers.keySet.map(_.toInt).toSet
     containerManager = NMClient.createNMClient()
     containerManager.init(conf)
     containerManager.start
 
-    info("Requesting %s containers" format state.taskCount)
+    info("Requesting %s containers" format state.containerCount)
 
     requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), state.neededContainers)
   }
@@ -101,22 +101,22 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
 
     info("Got a container from YARN ResourceManager: %s" format container)
 
-    state.unclaimedTasks.headOption match {
-      case Some(taskId) => {
-        info("Got available task id (%d) for container: %s" format (taskId, container))
-        val sspTaskNames = state.jobCoordinator.jobModel.getContainers.get(taskId)
-        info("Claimed SSP taskNames %s for container ID %s" format (sspTaskNames, taskId))
+    state.unclaimedContainers.headOption match {
+      case Some(containerId) => {
+        info("Got available container ID (%d) for container: %s" format (containerId, container))
+        val sspTaskNames = state.jobCoordinator.jobModel.getContainers.get(containerId)
+        info("Claimed SSP taskNames %s for container ID %s" format (sspTaskNames, containerId))
         val cmdBuilderClassName = config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName)
         val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
           .setConfig(config)
-          .setId(taskId)
+          .setId(containerId)
           .setUrl(state.coordinatorUrl)
         val command = cmdBuilder.buildCommand
-        info("Task ID %s using command %s" format (taskId, command))
+        info("Container ID %s using command %s" format (containerId, command))
         val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) }
-        info("Task ID %s using env %s" format (taskId, env))
+        info("Container ID %s using env %s" format (containerId, env))
         val path = new Path(config.getPackagePath.get)
-        info("Starting task ID %s using package path %s" format (taskId, path))
+        info("Starting container ID %s using package path %s" format (containerId, path))
 
         startContainer(
           path,
@@ -128,12 +128,12 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
         if (state.neededContainers == 0) {
           state.jobHealthy = true
         }
-        state.runningTasks += taskId -> new YarnContainer(container)
-        state.unclaimedTasks -= taskId
+        state.runningContainers += containerId -> new YarnContainer(container)
+        state.unclaimedContainers -= containerId
 
-        info("Claimed task ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (taskId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr))
+        info("Claimed container ID %s for container %s on node %s (http://%s/node/containerlogs/%s)." format (containerId, containerIdStr, container.getNodeId.getHost, container.getNodeHttpAddress, containerIdStr))
 
-        info("Started task ID %s" format taskId)
+        info("Started container ID %s" format containerId)
       }
       case _ => {
         // there are no more tasks to run, so release the container
@@ -146,11 +146,11 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
 
   override def onContainerCompleted(containerStatus: ContainerStatus) {
     val containerIdStr = ConverterUtils.toString(containerStatus.getContainerId)
-    val taskId = state.runningTasks.filter { case (_, container) => container.id.equals(containerStatus.getContainerId()) }.keys.headOption
+    val containerId = state.runningContainers.filter { case (_, container) => container.id.equals(containerStatus.getContainerId()) }.keys.headOption
 
-    taskId match {
-      case Some(taskId) => {
-        state.runningTasks -= taskId
+    containerId match {
+      case Some(containerId) => {
+        state.runningContainers -= containerId
       }
       case _ => None
     }
@@ -159,15 +159,15 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
       case 0 => {
         info("Container %s completed successfully." format containerIdStr)
 
-        state.completedTasks += 1
+        state.completedContainers += 1
 
-        if (taskId.isDefined) {
-          state.finishedTasks += taskId.get
-          taskFailures -= taskId.get
+        if (containerId.isDefined) {
+          state.finishedContainers += containerId.get
+          containerFailures -= containerId.get
         }
 
-        if (state.completedTasks == state.taskCount) {
-          info("Setting job status to SUCCEEDED, since all tasks have been marked as completed.")
+        if (state.completedContainers == state.containerCount) {
+          info("Setting job status to SUCCEEDED, since all containers have been marked as completed.")
           state.status = FinalApplicationStatus.SUCCEEDED
         }
       }
@@ -178,16 +178,16 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
 
         state.releasedContainers += 1
 
-        // If this container was assigned some partitions (a containerId), then 
-        // clean up, and request a new container for the tasks. This only 
-        // should happen if the container was 'lost' due to node failure, not 
+        // If this container was assigned some partitions (a containerId), then
+        // clean up, and request a new container for the tasks. This only
+        // should happen if the container was 'lost' due to node failure, not
         // if the AM released the container.
-        if (taskId.isDefined) {
-          info("Released container %s was assigned task ID %s. Requesting a new container for the task." format (containerIdStr, taskId.get))
+        if (containerId.isDefined) {
+          info("Released container %s was assigned task group ID %s. Requesting a new container for the task group." format (containerIdStr, containerId.get))
 
           state.neededContainers += 1
           state.jobHealthy = false
-          state.unclaimedTasks += taskId.get
+          state.unclaimedContainers += containerId.get
 
           // request a new container
           requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), 1)
@@ -199,33 +199,33 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
         state.failedContainers += 1
         state.jobHealthy = false
 
-        taskId match {
-          case Some(taskId) =>
-            info("Failed container %s owned task id %s." format (containerIdStr, taskId))
+        containerId match {
+          case Some(containerId) =>
+            info("Failed container %s owned task group ID %s." format (containerIdStr, containerId))
 
-            state.unclaimedTasks += taskId
+            state.unclaimedContainers += containerId
             state.neededContainers += 1
 
-            // A container failed for an unknown reason. Let's check to see if 
-            // we need to shutdown the whole app master if too many container 
-            // failures have happened. The rules for failing are that the failure 
-            // count for a task id must be > the configured retry count, and the 
-            // last failure (the one prior to this one) must have happened less 
-            // than retry window ms ago. If retry count is set to 0, the app 
-            // master will fail on any container failure. If the retry count is 
-            // set to a number < 0, a container failure will never trigger an 
-            // app master failure.
+            // A container failed for an unknown reason. Let's check to see if
+            // we need to shutdown the whole app master if too many container
+            // failures have happened. The rules for failing are that the
+            // failure count for a task group id must be > the configured retry
+            // count, and the last failure (the one prior to this one) must have
+            // happened less than retry window ms ago. If retry count is set to
+            // 0, the app master will fail on any container failure. If the
+            // retry count is set to a number < 0, a container failure will
+            // never trigger an app master failure.
             val retryCount = config.getContainerRetryCount.getOrElse(DEFAULT_CONTAINER_RETRY_COUNT)
             val retryWindowMs = config.getContainerRetryWindowMs.getOrElse(DEFAULT_CONTAINER_RETRY_WINDOW_MS)
 
             if (retryCount == 0) {
-              error("Task id %s (%s) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed."
-                format (taskId, containerIdStr))
+              error("Container ID %s (%s) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed."
+                format (containerId, containerIdStr))
 
               tooManyFailedContainers = true
             } else if (retryCount > 0) {
-              val (currentFailCount, lastFailureTime) = taskFailures.get(taskId) match {
-                case Some(TaskFailure(count, lastFailure)) => (count + 1, lastFailure)
+              val (currentFailCount, lastFailureTime) = containerFailures.get(containerId) match {
+                case Some(ContainerFailure(count, lastFailure)) => (count + 1, lastFailure)
                 case _ => (1, 0L)
               }
 
@@ -233,24 +233,24 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
                 val lastFailureMsDiff = clock() - lastFailureTime
 
                 if (lastFailureMsDiff < retryWindowMs) {
-                  error("Task id %s (%s) has failed %s times, with last failure %sms ago. This is greater than retry count of %s and window of %s, so shutting down the application master, and marking the job as failed."
-                    format (taskId, containerIdStr, currentFailCount, lastFailureMsDiff, retryCount, retryWindowMs))
+                  error("Container ID %s (%s) has failed %s times, with last failure %sms ago. This is greater than retry count of %s and window of %s, so shutting down the application master, and marking the job as failed."
+                    format (containerId, containerIdStr, currentFailCount, lastFailureMsDiff, retryCount, retryWindowMs))
 
-                  // We have too many failures, and we're within the window 
+                  // We have too many failures, and we're within the window
                   // boundary, so reset shut down the app master.
                   tooManyFailedContainers = true
                   state.status = FinalApplicationStatus.FAILED
                 } else {
-                  info("Resetting fail count for task id %s back to 1, since last container failure (%s) for this task id was outside the bounds of the retry window."
-                    format (taskId, containerIdStr))
+                  info("Resetting fail count for container ID %s back to 1, since last container failure (%s) for this container ID was outside the bounds of the retry window."
+                    format (containerId, containerIdStr))
 
-                  // Reset counter back to 1, since the last failure for this 
+                  // Reset counter back to 1, since the last failure for this
                   // container happened outside the window boundary.
-                  taskFailures += taskId -> TaskFailure(1, clock())
+                  containerFailures += containerId -> ContainerFailure(1, clock())
                 }
               } else {
-                info("Current fail count for task id %s is %s." format (taskId, currentFailCount))
-                taskFailures += taskId -> TaskFailure(currentFailCount, clock())
+                info("Current fail count for container ID %s is %s." format (containerId, currentFailCount))
+                containerFailures += containerId -> ContainerFailure(currentFailCount, clock())
               }
             }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index 4c855bf..09f4dc3 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -80,7 +80,7 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppMasterState, r
   get("/am") {
     val containers = new HashMap[String, HashMap[String, Object]]
 
-    state.runningTasks.foreach {
+    state.runningContainers.foreach {
       case (containerId, container) =>
         val yarnContainerId = container.id.toString
         val containerMap = new HashMap[String, Object]

http://git-wip-us.apache.org/repos/asf/samza/blob/c37d7527/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 58f2464..ee2d0ea 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -152,10 +152,10 @@ class TestSamzaAppMasterTaskManager {
     "yarn.container.retry.window.ms" -> "1999999999"))
 
   @Test
-  def testAppMasterShouldDefaultToOneContainerIfTaskCountIsNotSpecified {
+  def testAppMasterShouldDefaultToOneContainerIfContainerCountIsNotSpecified {
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, null, new YarnConfiguration)
-    assertEquals(1, state.taskCount)
+    assertEquals(1, state.containerCount)
   }
 
   @Test
@@ -166,8 +166,8 @@ class TestSamzaAppMasterTaskManager {
     assertFalse(taskManager.shouldShutdown)
     taskManager.onContainerCompleted(getContainerStatus(state.containerId, 0, ""))
     assertTrue(taskManager.shouldShutdown)
-    assertEquals(1, state.completedTasks)
-    assertEquals(1, state.taskCount)
+    assertEquals(1, state.completedContainers)
+    assertEquals(1, state.containerCount)
     assertTrue(state.jobHealthy)
     assertEquals(FinalApplicationStatus.SUCCEEDED, state.status)
   }
@@ -212,7 +212,7 @@ class TestSamzaAppMasterTaskManager {
     val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
     state.coordinatorUrl = new URL("http://localhost:1234")
-    state.taskCount = 2
+    state.containerCount = 2
     var containersRequested = 0
     var containersStarted = 0
     val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
@@ -237,16 +237,16 @@ class TestSamzaAppMasterTaskManager {
     // allocate container 2
     taskManager.onContainerAllocated(getContainer(container2))
     assertEquals(0, state.neededContainers)
-    assertEquals(1, state.runningTasks.size)
-    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, state.runningContainers.size)
+    assertEquals(0, state.unclaimedContainers.size)
     assertEquals(1, containersRequested)
     assertEquals(1, containersStarted)
 
     // allocate an extra container, which the AM doesn't need, and should be released
     taskManager.onContainerAllocated(getContainer(container3))
     assertEquals(0, state.neededContainers)
-    assertEquals(1, state.runningTasks.size)
-    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, state.runningContainers.size)
+    assertEquals(0, state.unclaimedContainers.size)
     assertEquals(1, amClient.getClient.requests.size)
     assertEquals(1, amClient.getClient.getRelease.size)
     assertEquals(container3, amClient.getClient.getRelease.head)
@@ -260,8 +260,8 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerCompleted(getContainerStatus(container3, -100, "pretend the container was released"))
     assertFalse(taskManager.shouldShutdown)
     assertEquals(0, state.neededContainers)
-    assertEquals(1, state.runningTasks.size)
-    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, state.runningContainers.size)
+    assertEquals(0, state.unclaimedContainers.size)
     assertEquals(0, amClient.getClient.requests.size)
     assertEquals(0, amClient.getClient.getRelease.size)
 
@@ -276,8 +276,8 @@ class TestSamzaAppMasterTaskManager {
     taskManager.onContainerAllocated(getContainer(container2))
     assertEquals(0, state.neededContainers)
     assertTrue(state.jobHealthy)
-    assertEquals(1, state.runningTasks.size)
-    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, state.runningContainers.size)
+    assertEquals(0, state.unclaimedContainers.size)
   }
 
   @Test
@@ -287,7 +287,7 @@ class TestSamzaAppMasterTaskManager {
     val newConfig = new MapConfig(map)
     val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
     val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
-    state.taskCount = 2
+    state.containerCount = 2
     state.coordinatorUrl = new URL("http://localhost:1234")
     var containersStarted = 0
     val taskManager = new SamzaAppMasterTaskManager(clock, newConfig, state, amClient, new YarnConfiguration) {
@@ -305,43 +305,43 @@ class TestSamzaAppMasterTaskManager {
     assertEquals(0, amClient.getClient.getRelease.size)
     taskManager.onContainerAllocated(getContainer(container2))
     assertEquals(1, state.neededContainers)
-    assertEquals(1, state.runningTasks.size)
-    assertEquals(1, state.unclaimedTasks.size)
+    assertEquals(1, state.runningContainers.size)
+    assertEquals(1, state.unclaimedContainers.size)
     assertEquals(1, containersStarted)
     taskManager.onContainerAllocated(getContainer(container3))
     assertEquals(0, state.neededContainers)
-    assertEquals(2, state.runningTasks.size)
-    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(2, state.runningContainers.size)
+    assertEquals(0, state.unclaimedContainers.size)
     assertEquals(2, containersStarted)
 
     // container2 finishes successfully
     taskManager.onContainerCompleted(getContainerStatus(container2, 0, ""))
     assertEquals(0, state.neededContainers)
-    assertEquals(1, state.runningTasks.size)
-    assertEquals(0, state.unclaimedTasks.size)
-    assertEquals(1, state.completedTasks)
+    assertEquals(1, state.runningContainers.size)
+    assertEquals(0, state.unclaimedContainers.size)
+    assertEquals(1, state.completedContainers)
 
     // container3 fails
     taskManager.onContainerCompleted(getContainerStatus(container3, 1, "expected failure here"))
     assertEquals(1, state.neededContainers)
-    assertEquals(0, state.runningTasks.size)
-    assertEquals(1, state.unclaimedTasks.size)
-    assertEquals(1, state.completedTasks)
+    assertEquals(0, state.runningContainers.size)
+    assertEquals(1, state.unclaimedContainers.size)
+    assertEquals(1, state.completedContainers)
     assertFalse(taskManager.shouldShutdown)
 
     // container3 is re-allocated
     taskManager.onContainerAllocated(getContainer(container3))
     assertEquals(0, state.neededContainers)
-    assertEquals(1, state.runningTasks.size)
-    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, state.runningContainers.size)
+    assertEquals(0, state.unclaimedContainers.size)
     assertEquals(3, containersStarted)
 
     // container3 finishes sucecssfully
     taskManager.onContainerCompleted(getContainerStatus(container3, 0, ""))
     assertEquals(0, state.neededContainers)
-    assertEquals(0, state.runningTasks.size)
-    assertEquals(0, state.unclaimedTasks.size)
-    assertEquals(2, state.completedTasks)
+    assertEquals(0, state.runningContainers.size)
+    assertEquals(0, state.unclaimedContainers.size)
+    assertEquals(2, state.completedContainers)
     assertTrue(taskManager.shouldShutdown)
   }
 
@@ -371,18 +371,18 @@ class TestSamzaAppMasterTaskManager {
     assertEquals(1, amClient.getClient.requests.size)
     assertEquals(0, amClient.getClient.getRelease.size)
     assertEquals(1, state.neededContainers)
-    assertEquals(0, state.runningTasks.size)
-    assertEquals(1, state.unclaimedTasks.size)
+    assertEquals(0, state.runningContainers.size)
+    assertEquals(1, state.unclaimedContainers.size)
     taskManager.onContainerAllocated(getContainer(container2))
     assertEquals(0, state.neededContainers)
-    assertEquals(1, state.runningTasks.size)
-    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, state.runningContainers.size)
+    assertEquals(0, state.unclaimedContainers.size)
     assertEquals(1, containersRequested)
     assertEquals(1, containersStarted)
     taskManager.onContainerAllocated(getContainer(container3))
     assertEquals(0, state.neededContainers)
-    assertEquals(1, state.runningTasks.size)
-    assertEquals(0, state.unclaimedTasks.size)
+    assertEquals(1, state.runningContainers.size)
+    assertEquals(0, state.unclaimedContainers.size)
     assertEquals(1, containersRequested)
     assertEquals(1, containersStarted)
     assertEquals(1, amClient.getClient.requests.size)
@@ -403,7 +403,7 @@ class MockSystemFactory extends SystemFactory {
   }
 
   def getAdmin(systemName: String, config: Config) = {
-    val containerCount = config.getTaskCount.getOrElse(1)
+    val containerCount = config.getContainerCount.getOrElse(1)
     new MockSystemAdmin(containerCount)
   }
 }