You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/07 17:34:41 UTC
git commit: SAMZA-58; Use YARN's AMRMClientAsync client library
Repository: incubator-samza
Updated Branches:
refs/heads/master bd2fb6776 -> 4e5573b17
SAMZA-58; Use YARN's AMRMClientAsync client library
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4e5573b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4e5573b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4e5573b1
Branch: refs/heads/master
Commit: 4e5573b172a76e8d1413efb4152f2976ea54304f
Parents: bd2fb67
Author: Zhijie Shen <zs...@hortonworks.com>
Authored: Mon Apr 7 08:34:32 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Apr 7 08:34:32 2014 -0700
----------------------------------------------------------------------
build.gradle | 1 +
.../org/apache/samza/config/YarnConfig.scala | 6 +
.../apache/samza/job/yarn/SamzaAppMaster.scala | 95 +++++++++---
.../job/yarn/SamzaAppMasterLifecycle.scala | 16 +-
.../samza/job/yarn/SamzaAppMasterMetrics.scala | 5 -
.../job/yarn/SamzaAppMasterTaskManager.scala | 59 ++++----
.../apache/samza/job/yarn/YarnAppMaster.scala | 85 -----------
.../samza/job/yarn/YarnAppMasterListener.scala | 5 -
.../samza/job/yarn/TestSamzaAppMaster.scala | 150 +++++++++++++++++++
.../job/yarn/TestSamzaAppMasterLifecycle.scala | 50 +++----
.../yarn/TestSamzaAppMasterTaskManager.scala | 106 ++++++-------
.../samza/job/yarn/TestYarnAppMaster.scala | 142 ------------------
12 files changed, 347 insertions(+), 373 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 75b6c98..b54cee6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -151,6 +151,7 @@ project(":samza-yarn_$scalaVersion") {
exclude module: 'slf4j-api'
}
testCompile "junit:junit:$junitVersion"
+ testCompile "org.mockito:mockito-all:$mockitoVersion"
}
repositories {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
index 6c3aa92..5756d3a 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -30,6 +30,7 @@ object YarnConfig {
val AM_JVM_OPTIONS = "yarn.am.opts"
val AM_JMX_ENABLED = "yarn.am.jmx.enabled"
val AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb"
+ val AM_POLL_INTERVAL_MS = "yarn.am.poll.interval.ms"
implicit def Config2Yarn(config: Config) = new YarnConfig(config)
}
@@ -69,5 +70,10 @@ class YarnConfig(config: Config) extends ScalaMapConfig(config) {
case _ => None
}
+ def getAMPollIntervalMs: Option[Int] = getOption(YarnConfig.AM_POLL_INTERVAL_MS) match {
+ case Some(interval) => Some(interval.toInt)
+ case _ => None
+ }
+
def getJmxServerEnabled = getBoolean(YarnConfig.AM_JMX_ENABLED, true)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index de6887d..c28c9a6 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -18,21 +18,27 @@
*/
package org.apache.samza.job.yarn
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.YarnConfig
-import org.apache.samza.config.serializers.JsonConfigSerializer
+
+import scala.collection.JavaConversions.asScalaBuffer
+
import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus, NodeReport }
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
-import scala.collection.JavaConversions._
-import org.apache.samza.metrics.{ JmxServer, MetricsRegistryMap }
-import grizzled.slf4j.Logging
-import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
-import org.apache.samza.config.YarnConfig._
-import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._
-import org.apache.samza.util.hadoop.HttpFileSystem
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.samza.config.MapConfig
import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.config.YarnConfig
+import org.apache.samza.config.YarnConfig.Config2Yarn
+import org.apache.samza.config.serializers.JsonConfigSerializer
+import org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CONTAINER_MEM
+import org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CPU_CORES
+import org.apache.samza.metrics.JmxServer
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.util.hadoop.HttpFileSystem
+
+import grizzled.slf4j.Logging
/**
* When YARN executes an application master, it needs a bash command to
@@ -45,7 +51,12 @@ import org.apache.samza.config.ShellCommandConfig
* YARN client, and YARN itself), and wires up everything to run Samza's
* application master.
*/
-object SamzaAppMaster extends Logging {
+object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler {
+ val DEFAULT_POLL_INTERVAL_MS: Int = 1000
+ var state: SamzaAppMasterState = null
+ var listeners: List[YarnAppMasterListener] = null
+ var storedException: Throwable = null
+
def main(args: Array[String]) {
val containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString)
info("got container id: %s" format containerIdStr)
@@ -62,7 +73,8 @@ object SamzaAppMaster extends Logging {
info("got config: %s" format config)
val hConfig = new YarnConfiguration
hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)
- val amClient = new AMRMClientImpl[ContainerRequest]
+ val interval = config.getAMPollIntervalMs.getOrElse(DEFAULT_POLL_INTERVAL_MS)
+ val amClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](interval, this)
val clientHelper = new ClientHelper(hConfig)
val registry = new MetricsRegistryMap
val containerMem = config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM)
@@ -71,14 +83,13 @@ object SamzaAppMaster extends Logging {
try {
// wire up all of the yarn event listeners
- val state = new SamzaAppMasterState(-1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
+ state = new SamzaAppMasterState(-1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
val service = new SamzaAppMasterService(config, state, registry, clientHelper)
- val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient, hConfig)
+ val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient)
val metrics = new SamzaAppMasterMetrics(config, state, registry)
val am = new SamzaAppMasterTaskManager({ System.currentTimeMillis }, config, state, amClient, hConfig)
-
- // run the app master
- new YarnAppMaster(List(state, service, lifecycle, metrics, am), amClient).run
+ listeners = List(state, service, lifecycle, metrics, am)
+ run(amClient, listeners, hConfig, interval)
} finally {
// jmxServer has to be stopped or will prevent process from exiting.
if (jmxServer.isDefined) {
@@ -86,4 +97,52 @@ object SamzaAppMaster extends Logging {
}
}
}
+
+ def run(amClient: AMRMClientAsync[ContainerRequest], listeners: List[YarnAppMasterListener], hConfig: YarnConfiguration, interval: Int): Unit = {
+ try {
+ amClient.init(hConfig)
+ amClient.start
+ listeners.foreach(_.onInit)
+ var isShutdown: Boolean = false
+ // have the loop to prevent the process from exiting until the job is to shutdown or error occurs on amClient
+ while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _) && storedException == null) {
+ try {
+ Thread.sleep(interval)
+ } catch {
+ case e: InterruptedException => {
+ isShutdown = true
+ info("got interrupt in app master thread, so shutting down")
+ }
+ }
+ }
+ } finally {
+ // listeners has to be stopped
+ listeners.foreach(listener => try {
+ listener.onShutdown
+ } catch {
+ case e: Exception => warn("Listener %s failed to shutdown." format listener, e)
+ })
+ // amClient has to be stopped
+ amClient.stop
+ }
+ }
+
+ override def onContainersCompleted(statuses: java.util.List[ContainerStatus]): Unit =
+ statuses.foreach(containerStatus => listeners.foreach(_.onContainerCompleted(containerStatus)))
+
+ override def onContainersAllocated(containers: java.util.List[Container]): Unit =
+ containers.foreach(container => listeners.foreach(_.onContainerAllocated(container)))
+
+ override def onShutdownRequest: Unit = listeners.foreach(_.onReboot)
+
+ override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = Unit
+
+ // TODO need to think about meaningful SAMZA's progress
+ override def getProgress: Float = 0.0F
+
+ override def onError(e: Throwable): Unit = {
+ error("Error occured in amClient's callback", e)
+ storedException = e
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
index 5d09265..8cb9490 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
@@ -18,27 +18,26 @@
*/
package org.apache.samza.job.yarn
-import grizzled.slf4j.Logging
-import org.apache.samza.SamzaException
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.conf.YarnConfiguration
+
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.apache.samza.SamzaException
+
+import grizzled.slf4j.Logging
/**
* Responsible for managing the lifecycle of the application master. Mostly,
* this means registering and unregistering with the RM, and shutting down
* when the RM tells us to Reboot.
*/
-class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient[_], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClientAsync[ContainerRequest]) extends YarnAppMasterListener with Logging {
var validResourceRequest = true
var shutdownMessage: String = null
override def onInit() {
val host = state.nodeHost
- amClient.init(conf);
- amClient.start
-
val response = amClient.registerApplicationMaster(host, state.rpcPort, "%s:%d" format (host, state.trackingPort))
// validate that the YARN cluster can handle our container resource requirements
@@ -63,7 +62,6 @@ class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: Samza
override def onShutdown() {
info("Shutting down.")
amClient.unregisterApplicationMaster(state.status, shutdownMessage, null)
- amClient.stop
}
override def shouldShutdown = !validResourceRequest
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
index 983771d..851aae6 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
@@ -51,7 +51,6 @@ class SamzaAppMasterMetrics(
val registry: ReadableMetricsRegistry) extends MetricsHelper with YarnAppMasterListener with Logging {
val jvm = new JvmMetrics(registry)
- val mEventLoops = newCounter("event-loops")
val reporters = config.getMetricReporterNames.map(reporterName => {
val metricsFactoryClassName = config
.getMetricsFactoryClass(reporterName)
@@ -82,10 +81,6 @@ class SamzaAppMasterMetrics(
reporters.values.foreach(_.start)
}
- override def onEventLoop() {
- mEventLoops.inc
- }
-
override def onShutdown() {
reporters.values.foreach(_.stop)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index 9058210..418a981 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -18,38 +18,34 @@
*/
package org.apache.samza.job.yarn
-import org.apache.hadoop.yarn.api.records.ContainerStatus
-import org.apache.hadoop.yarn.api.records.Container
-import org.apache.samza.config.Config
-import grizzled.slf4j.Logging
-import org.apache.samza.config.YarnConfig.Config2Yarn
-import org.apache.samza.config.YarnConfig
-import org.apache.samza.job.CommandBuilder
-import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.fs.Path
-import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-import org.apache.samza.util.Util
+
+import java.nio.ByteBuffer
+import java.util.Collections
+
import scala.collection.JavaConversions._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.api.records.Priority
-import org.apache.hadoop.yarn.api.records.Resource
-import org.apache.hadoop.yarn.util.Records
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.NMClient
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.LocalResourceType
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import java.util.Collections
-import org.apache.samza.job.ShellCommandBuilder
-import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
-import java.nio.ByteBuffer
-import org.apache.hadoop.yarn.client.api.impl.NMClientImpl
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.util.Records
+import org.apache.samza.config.Config
+import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.config.YarnConfig
+import org.apache.samza.config.YarnConfig.Config2Yarn
+import org.apache.samza.job.CommandBuilder
+import org.apache.samza.job.ShellCommandBuilder
+import org.apache.samza.util.Util
+
+import grizzled.slf4j.Logging
object SamzaAppMasterTaskManager {
val DEFAULT_CONTAINER_MEM = 1024
@@ -66,7 +62,7 @@ case class TaskFailure(val count: Int, val lastFailure: Long)
* containers, handling failures, and notifying the application master that the
* job is done.
*/
-class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
+class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClientAsync[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
import SamzaAppMasterTaskManager._
state.taskCount = config.getTaskCount match {
@@ -79,14 +75,15 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
val allSystemStreamPartitions = Util.getInputStreamPartitions(config)
var taskFailures = Map[Int, TaskFailure]()
var tooManyFailedContainers = false
- var containerManager: NMClientImpl = null
+ // TODO we might want to use NMClientAsync as well
+ var containerManager: NMClient = null
override def shouldShutdown = state.completedTasks == state.taskCount || tooManyFailedContainers
override def onInit() {
state.neededContainers = state.taskCount
state.unclaimedTasks = (0 until state.taskCount).toSet
- containerManager = new NMClientImpl()
+ containerManager = NMClient.createNMClient()
containerManager.init(conf)
containerManager.start
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
deleted file mode 100644
index e45c177..0000000
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-
-import scala.collection.JavaConversions._
-import grizzled.slf4j.Logging
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.api.records.AMCommand._
-import org.apache.samza.SamzaException
-
-/**
- * YARN's API is somewhat clunky. Most implementations just sit in a loop, and
- * poll the resource manager every N seconds (see the distributed shell
- * example). To make life slightly better, Samza separates the polling logic
- * from the application master logic, and we convert synchronous polling calls
- * to callbacks, which are more intuitive when dealing with event based
- * paradigms like YARN.
- *
- * <br/><br/>
- *
- * SamzaAppMaster uses this class to wire up all of Samza's application master
- * listeners.
- */
-class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) extends Logging {
- var isShutdown = false
-
- def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) = this(1000, listeners, amClient)
-
- def run {
- try {
- listeners.foreach(_.onInit)
-
- while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _)) {
- val response = amClient.allocate(0)
-
- if (response.getAMCommand != null) {
- response.getAMCommand match {
- case AM_RESYNC | AM_SHUTDOWN =>
- listeners.foreach(_.onReboot)
- case _ =>
- val msg = "Unhandled value of AMCommand: " + response.getAMCommand
- error(msg);
- throw new SamzaException(msg);
- }
- }
-
- listeners.foreach(_.onEventLoop)
- response.getCompletedContainersStatuses.foreach(containerStatus => listeners.foreach(_.onContainerCompleted(containerStatus)))
- response.getAllocatedContainers.foreach(container => listeners.foreach(_.onContainerAllocated(container)))
-
- try {
- Thread.sleep(pollIntervalMs)
- } catch {
- case e: InterruptedException => {
- isShutdown = true
- info("got interrupt in app master thread, so shutting down")
- }
- }
- }
- } finally {
- listeners.foreach(listener => try {
- listener.onShutdown
- } catch {
- case e: Exception => warn("Listener %s failed to shutdown." format listener, e)
- })
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
index 1353e86..6bf3046 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala
@@ -65,9 +65,4 @@ trait YarnAppMasterListener {
*/
def onContainerCompleted(containerStatus: ContainerStatus) {}
- /**
- * Invoked by YarnAppMaster once per listener, every time it loops around to
- * poll the RM again.
- */
- def onEventLoop() {}
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
new file mode 100644
index 0000000..190ce28
--- /dev/null
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMaster.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import scala.annotation.elidable
+import scala.annotation.elidable.ASSERTION
+
+import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus }
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.junit.Test
+
+import TestSamzaAppMasterTaskManager._
+
+class TestSamzaAppMaster {
+ @Test
+ def testAppMasterShouldShutdown {
+ val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
+ val listener = new YarnAppMasterListener {
+ var init = 0
+ var shutdown = 0
+ var allocated = 0
+ var complete = 0
+ override def shouldShutdown = true
+ override def onInit() {
+ init += 1
+ }
+ override def onShutdown() {
+ shutdown += 1
+ }
+ override def onContainerAllocated(container: Container) {
+ allocated += 1
+ }
+ override def onContainerCompleted(containerStatus: ContainerStatus) {
+ complete += 1
+ }
+ }
+ SamzaAppMaster.listeners = List(listener)
+ SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
+ assert(listener.init == 1)
+ assert(listener.shutdown == 1)
+ }
+
+ @Test
+ def testAppMasterShouldShutdownWithFailingListener {
+ val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
+ val listener1 = new YarnAppMasterListener {
+ var shutdown = 0
+ override def shouldShutdown = true
+ override def onShutdown() {
+ shutdown += 1
+ throw new RuntimeException("Some weird failure")
+ }
+ }
+ val listener2 = new YarnAppMasterListener {
+ var shutdown = 0
+ override def shouldShutdown = true
+ override def onShutdown() {
+ shutdown += 1
+ }
+ }
+ // listener1 will throw an exception in shutdown, and listener2 should still get called
+ SamzaAppMaster.listeners = List(listener1, listener2)
+ SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
+ assert(listener1.shutdown == 1)
+ assert(listener2.shutdown == 1)
+ }
+
+ @Test
+ def testAppMasterShouldShutdownWithInterrupt {
+ val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
+ val listener = new YarnAppMasterListener {
+ var init = 0
+ var shutdown = 0
+ override def shouldShutdown = false
+ override def onInit() {
+ init += 1
+ }
+ override def onShutdown() {
+ shutdown += 1
+ }
+ }
+ val thread = new Thread {
+ override def run {
+ SamzaAppMaster.listeners = List(listener)
+ SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
+ }
+ }
+ thread.start
+ thread.interrupt
+ thread.join
+ assert(listener.init == 1)
+ assert(listener.shutdown == 1)
+ }
+
+ @Test
+ def testAppMasterShouldForwardAllocatedAndCompleteContainers {
+ val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(getContainer(null)), List(getContainerStatus(null, 1, null)))))
+ val listener = new YarnAppMasterListener {
+ var allocated = 0
+ var complete = 0
+ override def onInit(): Unit = amClient.registerApplicationMaster("", -1, "")
+ override def shouldShutdown = (allocated >= 1 && complete >= 1)
+ override def onContainerAllocated(container: Container) {
+ allocated += 1
+ }
+ override def onContainerCompleted(containerStatus: ContainerStatus) {
+ complete += 1
+ }
+ }
+ SamzaAppMaster.listeners = List(listener)
+ SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
+ // heartbeat may be triggered for more than once
+ assert(listener.allocated >= 1)
+ assert(listener.complete >= 1)
+ }
+
+ @Test
+ def testAppMasterShouldReboot {
+ val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(true, List(), List())))
+ val listener = new YarnAppMasterListener {
+ var reboot = 0
+ override def onInit(): Unit = amClient.registerApplicationMaster("", -1, "")
+ override def shouldShutdown = reboot >= 1
+ override def onReboot() {
+ reboot += 1
+ }
+ }
+ SamzaAppMaster.listeners = List(listener)
+ SamzaAppMaster.run(amClient, SamzaAppMaster.listeners, new YarnConfiguration, 1)
+ // heartbeat may be triggered for more than once
+ assert(listener.reboot >= 1)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
index 4ee77e8..cce63eb 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
@@ -18,28 +18,26 @@
*/
package org.apache.samza.job.yarn
-import org.junit.Assert._
-import org.junit.Test
+
+import java.nio.ByteBuffer
+
+import scala.annotation.elidable
+import scala.annotation.elidable.ASSERTION
+
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-import org.apache.hadoop.yarn.api.records.ContainerId
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
-import org.apache.hadoop.yarn.api.records.Priority
-import org.apache.hadoop.yarn.api.records.Resource
+import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
-import org.apache.hadoop.yarn.api.records.Resource
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
-import org.apache.hadoop.service._
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync.CallbackHandler
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
+import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.samza.SamzaException
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType
-import java.nio.ByteBuffer
+import org.junit.Assert._
+import org.junit.Test
+import org.mockito.Mockito
class TestSamzaAppMasterLifecycle {
- val amClient = new AMRMClientImpl() {
+ val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) {
var host = ""
var port = 0
var status: FinalApplicationStatus = null
@@ -61,7 +59,6 @@ class TestSamzaAppMasterLifecycle {
override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {}
}
}
- override def allocate(progressIndicator: Float): AllocateResponse = null
override def unregisterApplicationMaster(appStatus: FinalApplicationStatus,
appMessage: String,
appTrackingUrl: String) {
@@ -70,19 +67,16 @@ class TestSamzaAppMasterLifecycle {
override def releaseAssignedContainer(containerId: ContainerId) {}
override def getClusterNodeCount() = 1
- override def init(config: Configuration) {}
- override def start() {}
- override def stop() {}
- override def getName(): String = ""
- override def getConfig() = null
- override def getStartTime() = 0L
+ override def serviceInit(config: Configuration) {}
+ override def serviceStart() {}
+ override def serviceStop() {}
}
@Test
def testLifecycleShouldRegisterOnInit {
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
state.rpcPort = 1
- val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient, new YarnConfiguration)
+ val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient)
saml.onInit
assert(amClient.host == "test")
assert(amClient.port == 1)
@@ -93,7 +87,7 @@ class TestSamzaAppMasterLifecycle {
def testLifecycleShouldUnregisterOnShutdown {
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
state.status = FinalApplicationStatus.SUCCEEDED
- new SamzaAppMasterLifecycle(128, 1, state, amClient, new YarnConfiguration).onShutdown
+ new SamzaAppMasterLifecycle(128, 1, state, amClient).onShutdown
assert(amClient.status == FinalApplicationStatus.SUCCEEDED)
}
@@ -101,7 +95,7 @@ class TestSamzaAppMasterLifecycle {
def testLifecycleShouldThrowAnExceptionOnReboot {
var gotException = false
try {
- new SamzaAppMasterLifecycle(368, 1, null, amClient, new YarnConfiguration).onReboot
+ new SamzaAppMasterLifecycle(368, 1, null, amClient).onReboot
} catch {
// expected
case e: SamzaException => gotException = true
@@ -113,8 +107,8 @@ class TestSamzaAppMasterLifecycle {
def testLifecycleShouldShutdownOnInvalidContainerSettings {
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
state.rpcPort = 1
- List(new SamzaAppMasterLifecycle(768, 1, state, amClient, new YarnConfiguration),
- new SamzaAppMasterLifecycle(368, 3, state, amClient, new YarnConfiguration)).map(saml => {
+ List(new SamzaAppMasterLifecycle(768, 1, state, amClient),
+ new SamzaAppMasterLifecycle(368, 3, state, amClient)).map(saml => {
saml.onInit
assertTrue(saml.shouldShutdown)
})
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 9d832ae..7fd80d5 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -18,27 +18,33 @@
*/
package org.apache.samza.job.yarn
-import org.junit.Assert._
-import org.junit.Test
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
-import org.apache.samza.Partition
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.util.ConverterUtils
+
+import scala.annotation.elidable
+import scala.annotation.elidable.ASSERTION
import scala.collection.JavaConversions._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
+
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse
+import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.yarn.api.records.NodeReport
-import TestSamzaAppMasterTaskManager._
-import org.apache.samza.system.{SystemStreamPartition, SystemFactory}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.samza.Partition
+import org.apache.samza.config.Config
+import org.apache.samza.config.MapConfig
import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.util.Util
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
+import org.apache.samza.util.Util
+import org.junit.Assert._
+import org.junit.Test
+
+import TestSamzaAppMasterTaskManager._
object TestSamzaAppMasterTaskManager {
def getContainer(containerId: ContainerId) = new Container {
@@ -80,7 +86,11 @@ object TestSamzaAppMasterTaskManager {
override def setDiagnostics(diagnostics: String) = {}
}
- def getAmClient = (response: AllocateResponse) => new AMRMClientImpl[ContainerRequest] {
+ def getAmClient = (amClient: TestAMRMClientImpl) => new AMRMClientAsyncImpl(amClient, 1, SamzaAppMaster) {
+ def getClient: TestAMRMClientImpl = amClient
+ }
+
+ class TestAMRMClientImpl(response: AllocateResponse) extends AMRMClientImpl[ContainerRequest] {
var requests: List[ContainerRequest] = List[ContainerRequest]()
def getRelease = release
@@ -91,13 +101,9 @@ object TestSamzaAppMasterTaskManager {
override def addContainerRequest(req: ContainerRequest) { requests ::= req }
override def removeContainerRequest(req: ContainerRequest) {}
override def getClusterNodeCount() = 1
-
- override def init(config: Configuration) {}
- override def start() {}
- override def stop() {}
- override def getName(): String = ""
- override def getConfig() = null
- override def getStartTime() = 0L
+ override def serviceInit(config: Configuration) {}
+ override def serviceStart() {}
+ override def serviceStop() {}
}
def getAppMasterResponse(reboot: Boolean, containers: List[Container], completed: List[ContainerStatus]) =
@@ -111,10 +117,10 @@ object TestSamzaAppMasterTaskManager {
override def getCompletedContainersStatuses() = completed
override def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {}
override def setUpdatedNodes(nodes: java.util.List[NodeReport]) {}
- override def getUpdatedNodes = null
+ override def getUpdatedNodes = List[NodeReport]()
override def getNumClusterNodes = 1
override def setNumClusterNodes(num: Int) {}
- override def getNMTokens = null
+ override def getNMTokens = List[NMToken]()
override def setNMTokens(nmTokens: java.util.List[NMToken]) {}
override def setAMCommand(command: AMCommand) {}
override def getPreemptionMessage = null
@@ -164,7 +170,7 @@ class TestSamzaAppMasterTaskManager {
@Test
def testAppMasterShouldRequestANewContainerWhenATaskFails {
- val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
+ val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
@@ -179,20 +185,20 @@ class TestSamzaAppMasterTaskManager {
taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here"))
assert(taskManager.shouldShutdown == false)
// 2. First is from onInit, second is from onContainerCompleted, since it failed.
- assertEquals(2, amClient.requests.size)
- assertEquals(0, amClient.getRelease.size)
+ assertEquals(2, amClient.getClient.requests.size)
+ assertEquals(0, amClient.getClient.getRelease.size)
assertFalse(taskManager.shouldShutdown)
// Now trigger an AM shutdown since our retry count is 1, and we're failing twice
taskManager.onContainerAllocated(getContainer(container2))
taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here"))
- assertEquals(2, amClient.requests.size)
- assertEquals(0, amClient.getRelease.size)
+ assertEquals(2, amClient.getClient.requests.size)
+ assertEquals(0, amClient.getClient.getRelease.size)
assertTrue(taskManager.shouldShutdown)
}
@Test
def testAppMasterShouldRequestANewContainerWhenATaskIsReleased {
- val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
+ val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
state.taskCount = 2
var containersRequested = 0
@@ -213,8 +219,8 @@ class TestSamzaAppMasterTaskManager {
assert(taskManager.shouldShutdown == false)
taskManager.onInit
assert(taskManager.shouldShutdown == false)
- assert(amClient.requests.size == 1)
- assert(amClient.getRelease.size == 0)
+ assert(amClient.getClient.requests.size == 1)
+ assert(amClient.getClient.getRelease.size == 0)
// allocate container 2
taskManager.onContainerAllocated(getContainer(container2))
@@ -231,13 +237,13 @@ class TestSamzaAppMasterTaskManager {
assert(state.runningTasks.size == 1)
assert(state.taskPartitions.size == 1)
assert(state.unclaimedTasks.size == 0)
- assert(amClient.requests.size == 1)
- assert(amClient.getRelease.size == 1)
- assert(amClient.getRelease.head.equals(container3))
+ assert(amClient.getClient.requests.size == 1)
+ assert(amClient.getClient.getRelease.size == 1)
+ assert(amClient.getClient.getRelease.head.equals(container3))
// reset the helper state, so we can make sure that releasing the container (next step) doesn't request more resources
- amClient.requests = List()
- amClient.resetRelease
+ amClient.getClient.requests = List()
+ amClient.getClient.resetRelease
// now release the container, and make sure the AM doesn't ask for more
assert(taskManager.shouldShutdown == false)
@@ -247,15 +253,15 @@ class TestSamzaAppMasterTaskManager {
assert(state.runningTasks.size == 1)
assert(state.taskPartitions.size == 1)
assert(state.unclaimedTasks.size == 0)
- assert(amClient.requests.size == 0)
- assert(amClient.getRelease.size == 0)
+ assert(amClient.getClient.requests.size == 0)
+ assert(amClient.getClient.getRelease.size == 0)
// pretend container 2 is released due to an NM failure, and make sure that the AM requests a new container
assert(taskManager.shouldShutdown == false)
taskManager.onContainerCompleted(getContainerStatus(container2, -100, "pretend the container was 'lost' due to an NM failure"))
assert(taskManager.shouldShutdown == false)
- assert(amClient.requests.size == 1)
- assert(amClient.getRelease.size == 0)
+ assert(amClient.getClient.requests.size == 1)
+ assert(amClient.getClient.getRelease.size == 0)
}
@Test
@@ -263,7 +269,7 @@ class TestSamzaAppMasterTaskManager {
val map = new java.util.HashMap[String, String](config)
map.put("yarn.container.count", "2")
val newConfig = new MapConfig(map)
- val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
+ val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
state.taskCount = 2
var containersStarted = 0
@@ -278,8 +284,8 @@ class TestSamzaAppMasterTaskManager {
assert(taskManager.shouldShutdown == false)
taskManager.onInit
assert(taskManager.shouldShutdown == false)
- assert(amClient.requests.size == 2)
- assert(amClient.getRelease.size == 0)
+ assert(amClient.getClient.requests.size == 2)
+ assert(amClient.getClient.getRelease.size == 0)
taskManager.onContainerAllocated(getContainer(container2))
assert(state.neededContainers == 1)
assert(state.runningTasks.size == 1)
@@ -330,7 +336,7 @@ class TestSamzaAppMasterTaskManager {
@Test
def testAppMasterShouldReleaseExtraContainers {
- val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
+ val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
var containersRequested = 0
var containersStarted = 0
@@ -350,8 +356,8 @@ class TestSamzaAppMasterTaskManager {
assert(taskManager.shouldShutdown == false)
taskManager.onInit
assert(taskManager.shouldShutdown == false)
- assert(amClient.requests.size == 1)
- assert(amClient.getRelease.size == 0)
+ assert(amClient.getClient.requests.size == 1)
+ assert(amClient.getClient.getRelease.size == 0)
assert(state.neededContainers == 1)
assert(state.runningTasks.size == 0)
assert(state.taskPartitions.size == 0)
@@ -370,9 +376,9 @@ class TestSamzaAppMasterTaskManager {
assert(state.unclaimedTasks.size == 0)
assert(containersRequested == 1)
assert(containersStarted == 1)
- assert(amClient.requests.size == 1)
- assert(amClient.getRelease.size == 1)
- assert(amClient.getRelease.head.equals(container3))
+ assert(amClient.getClient.requests.size == 1)
+ assert(amClient.getClient.getRelease.size == 1)
+ assert(amClient.getClient.getRelease.head.equals(container3))
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4e5573b1/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
deleted file mode 100644
index 98f7844..0000000
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.job.yarn
-import org.junit.Assert._
-import org.junit.Test
-import TestSamzaAppMasterTaskManager._
-import org.apache.hadoop.yarn.api.records.Container
-import org.apache.hadoop.yarn.api.records.ContainerStatus
-import org.apache.hadoop.yarn.api.records.ResourceRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.ContainerId
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-
-class TestYarnAppMaster {
- @Test
- def testAppMasterShouldShutdown {
- val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
- val listener = new YarnAppMasterListener {
- var init = 0
- var shutdown = 0
- var allocated = 0
- var complete = 0
- override def shouldShutdown = true
- override def onInit() {
- init += 1
- }
- override def onShutdown() {
- shutdown += 1
- }
- override def onContainerAllocated(container: Container) {
- allocated += 1
- }
- override def onContainerCompleted(containerStatus: ContainerStatus) {
- complete += 1
- }
- }
- new YarnAppMaster(List(listener), amClient).run
- assert(listener.init == 1)
- assert(listener.shutdown == 1)
- }
-
- @Test
- def testAppMasterShouldShutdownWithFailingListener {
- val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
- val listener1 = new YarnAppMasterListener {
- var shutdown = 0
- override def shouldShutdown = true
- override def onShutdown() {
- shutdown += 1
- throw new RuntimeException("Some weird failure")
- }
- }
- val listener2 = new YarnAppMasterListener {
- var shutdown = 0
- override def shouldShutdown = true
- override def onShutdown() {
- shutdown += 1
- }
- }
- // listener1 will throw an exception in shutdown, and listener2 should still get called
- new YarnAppMaster(List(listener1, listener2), amClient).run
- assert(listener1.shutdown == 1)
- assert(listener2.shutdown == 1)
- }
-
- @Test
- def testAppMasterShouldShutdownWithInterrupt {
- val amClient = getAmClient(getAppMasterResponse(false, List(), List()))
- val listener = new YarnAppMasterListener {
- var init = 0
- var shutdown = 0
- override def shouldShutdown = false
- override def onInit() {
- init += 1
- }
- override def onShutdown() {
- shutdown += 1
- }
- }
- val am = new YarnAppMaster(List(listener), amClient)
- val thread = new Thread {
- override def run {
- am.run
- }
- }
- thread.start
- thread.interrupt
- thread.join
- assert(listener.init == 1)
- assert(listener.shutdown == 1)
- }
-
- @Test
- def testAppMasterShouldForwardAllocatedAndCompleteContainers {
- val amClient = getAmClient(getAppMasterResponse(false, List(getContainer(null)), List(getContainerStatus(null, 1, null))))
- val listener = new YarnAppMasterListener {
- var allocated = 0
- var complete = 0
- override def shouldShutdown = (allocated == 1 && complete == 1)
- override def onContainerAllocated(container: Container) {
- allocated += 1
- }
- override def onContainerCompleted(containerStatus: ContainerStatus) {
- complete += 1
- }
- }
- new YarnAppMaster(List(listener), amClient).run
- assert(listener.allocated == 1)
- assert(listener.complete == 1)
- }
-
- @Test
- def testAppMasterShouldReboot {
- val amClient = getAmClient(getAppMasterResponse(true, List(), List()))
- val listener = new YarnAppMasterListener {
- var reboot = 0
- override def shouldShutdown = reboot == 1
- override def onReboot() {
- reboot += 1
- }
- }
- new YarnAppMaster(List(listener), amClient).run
- assert(listener.reboot == 1)
- }
-}