You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:31 UTC
[21/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
new file mode 100644
index 0000000..7e5d9ec
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import java.io.File
+import java.nio.ByteBuffer
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileSystem => YarnFileSystem, Path}
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.mapreduce.security.TokenCache
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.LogUtil
+
+private[glue]
+object ContainerLaunchContext {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String)
+ : ContainerLaunchContext = {
+ val context = Records.newRecord(classOf[ContainerLaunchContext])
+ context.setCommands(Seq(command).asJava)
+ context.setEnvironment(getAppEnv(yarnConf).asJava)
+ context.setTokens(getToken(yarnConf, packagePath, configPath))
+ context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath).asJava)
+ context
+ }
+
+ private def getFs(yarnConf: YarnConfiguration) = YarnFileSystem.get(yarnConf)
+
+ private def getAppEnv(yarnConf: YarnConfiguration): Map[String, String] = {
+ val classPaths = yarnConf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.mkString(File.pathSeparator))
+ val allPaths = Option(classPaths).getOrElse(Array(""))
+
+ allPaths :+ Environment.PWD.$() + File.separator + "*" + File.pathSeparator
+
+ Map(Environment.CLASSPATH.name -> allPaths.map(_.trim).mkString(File.pathSeparator))
+ }
+
+ private def getAMLocalResourcesMap(
+ yarnConf: YarnConfiguration, packagePath: String, configPath: String)
+ : Map[String, LocalResource] = {
+ val fs = getFs(yarnConf)
+
+ Map(
+ "pack" -> newYarnAppResource(fs, new Path(packagePath),
+ LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION),
+ "conf" -> newYarnAppResource(fs, new Path(configPath),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION))
+ }
+
+ private def newYarnAppResource(
+ fs: YarnFileSystem, path: Path,
+ resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = {
+ val qualified = fs.makeQualified(path)
+ val status = fs.getFileStatus(qualified)
+ val resource = Records.newRecord(classOf[LocalResource])
+ resource.setType(resourceType)
+ resource.setVisibility(vis)
+ resource.setResource(ConverterUtils.getYarnUrlFromPath(qualified))
+ resource.setTimestamp(status.getModificationTime)
+ resource.setSize(status.getLen)
+ resource
+ }
+
+ private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String)
+ : ByteBuffer = {
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ val dob = new DataOutputBuffer
+ val dirs = Array(new Path(packagePath), new Path(configPath))
+ TokenCache.obtainTokensForNamenodes(credentials, dirs, yc)
+ credentials.writeTokenStorageToStream(dob)
+ ByteBuffer.wrap(dob.getData)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala
new file mode 100644
index 0000000..dcb53e9
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import java.io.{InputStream, OutputStream}
+import java.net.ConnectException
+
+import org.apache.gearpump.util.LogUtil
+import org.apache.hadoop.fs.Path
+
+import scala.util.{Failure, Success, Try}
+
+class FileSystem(yarnConfig: YarnConfig) {
+
+ private val conf = yarnConfig.conf
+ private val fs = org.apache.hadoop.fs.FileSystem.get(conf)
+
+ private def LOG = LogUtil.getLogger(getClass)
+
+ def open(file: String): InputStream = exceptionHandler {
+ val path = new Path(file)
+ fs.open(path)
+ }
+
+ def create(file: String): OutputStream = exceptionHandler {
+ val path = new Path(file)
+ fs.create(path)
+ }
+
+ def exists(file: String): Boolean = exceptionHandler {
+ val path = new Path(file)
+ fs.exists(path)
+ }
+
+ def name: String = {
+ fs.getUri.toString
+ }
+
+ def getHomeDirectory: String = {
+ fs.getHomeDirectory.toString
+ }
+
+ private def exceptionHandler[T](call: => T): T = {
+ val callTry = Try(call)
+ callTry match {
+ case Success(v) => v
+ case Failure(ex) =>
+ if (ex.isInstanceOf[ConnectException]) {
+ LOG.error("Please check whether we connect to the right HDFS file system, " +
+ "current file system is $name." + "\n. Please copy all configs under " +
+ "$HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, " +
+ "so that we can use the right File system.", ex)
+ }
+ throw ex
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
new file mode 100644
index 0000000..59f3832
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import java.nio.ByteBuffer
+
+import akka.actor.ActorRef
+import com.typesafe.config.Config
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted
+import org.apache.gearpump.experiments.yarn.glue.Records._
+import org.apache.gearpump.util.LogUtil
+import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource}
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
+/**
+ * Adapter for node manager client
+ */
+class NMClient(yarnConf: YarnConfig, config: Config) extends NMClientAsync.CallbackHandler {
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ private var reportTo: ActorRef = null
+ private var client: NMClientAsyncImpl = null
+
+ def start(reportTo: ActorRef): Unit = {
+ LOG.info("Starting Node Manager Client NMClient...")
+ this.reportTo = reportTo
+ client = new NMClientAsyncImpl(this)
+ client.init(yarnConf.conf)
+ client.start()
+ }
+
+ private[glue]
+ override def onContainerStarted(
+ containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) {
+ LOG.info(s"Container started : $containerId, " + allServiceResponse)
+ reportTo ! ContainerStarted(containerId)
+ }
+
+ private[glue]
+ override def onContainerStatusReceived(
+ containerId: YarnContainerId, containerStatus: YarnContainerStatus) {
+ LOG.info(s"Container status received : $containerId, status $containerStatus")
+ }
+
+ private[glue]
+ override def onContainerStopped(containerId: YarnContainerId) {
+ LOG.error(s"Container stopped : $containerId")
+ }
+
+ private[glue]
+ override def onGetContainerStatusError(containerId: YarnContainerId, throwable: Throwable) {
+ LOG.error(s"Container exception : $containerId", throwable)
+ }
+
+ private[glue]
+ override def onStartContainerError(containerId: YarnContainerId, throwable: Throwable) {
+ LOG.error(s"Container exception : $containerId", throwable)
+ }
+
+ private[glue]
+ override def onStopContainerError(containerId: YarnContainerId, throwable: Throwable) {
+ LOG.error(s"Container exception : $containerId", throwable)
+ }
+
+ def launchCommand(
+ container: Container, command: String, packagePath: String, configPath: String): Unit = {
+ LOG.info(s"Launching command : $command on container" +
+ s": ${container.getId}, host ip : ${container.getNodeId.getHost}")
+ val context = ContainerLaunchContext(yarnConf.conf, command, packagePath, configPath)
+ client.startContainerAsync(container, context)
+ }
+
+ def stopContainer(containerId: ContainerId, nodeId: NodeId): Unit = {
+ LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} ")
+ client.stopContainerAsync(containerId, nodeId)
+ }
+
+ def stop(): Unit = {
+ LOG.info(s"Shutdown NMClient")
+ client.stop()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
new file mode 100644
index 0000000..629e233
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import akka.actor.ActorRef
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication}
+import org.apache.gearpump.experiments.yarn.glue.Records._
+import org.apache.gearpump.util.LogUtil
+import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => YarnResource}
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+
+/**
+ * Adapter for resource manager client
+ */
+class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ private var reportTo: ActorRef = null
+ private var client: AMRMClientAsync[ContainerRequest] = null
+
+ def start(reportTo: ActorRef): Unit = {
+ LOG.info("Starting Resource Manager Client RMClient...")
+ this.reportTo = reportTo
+ client = startAMRMClient
+ }
+
+ private def startAMRMClient: AMRMClientAsync[ContainerRequest] = {
+ val timeIntervalMs = 1000 // ms
+ val amrmClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](timeIntervalMs, this)
+ amrmClient.init(yarnConf.conf)
+ amrmClient.start()
+ amrmClient
+ }
+
+ override def getProgress: Float = 0.5F
+
+ private var allocatedContainers = Set.empty[YarnContainerId]
+
+ private[glue]
+ override def onContainersAllocated(containers: java.util.List[YarnContainer]) {
+ val newContainers = containers.asScala.toList.filterNot(container =>
+ allocatedContainers.contains(container.getId))
+ allocatedContainers ++= newContainers.map(_.getId)
+ LOG.info(s"New allocated ${newContainers.size} containers")
+ reportTo ! ContainersAllocated(newContainers.map(yarnContainerToContainer(_)))
+ }
+
+ private[glue]
+ override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus])
+ : Unit = {
+ LOG.info(s"Got response from RM. Completed containers=${completedContainers.size()}")
+ reportTo ! ContainersCompleted(
+ completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_)))
+ }
+
+ private[glue]
+ override def onError(ex: Throwable) {
+ LOG.info("Error occurred")
+ reportTo ! ResourceManagerException(ex)
+ }
+
+ private[glue]
+ override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = {
+ LOG.info("onNodesUpdates")
+ }
+
+ private[glue]
+ override def onShutdownRequest() {
+ LOG.info("Shutdown requested")
+ reportTo ! ShutdownApplication
+ }
+
+ def requestContainers(containers: List[Resource]): Unit = {
+ LOG.info(s"request Resource, slots: ${containers.length}, ${containers.mkString("\n")}")
+ containers.foreach(resource => {
+ client.addContainerRequest(createContainerRequest(resource))
+ })
+ }
+
+ private def createContainerRequest(capability: Resource): ContainerRequest = {
+ LOG.info("creating ContainerRequest")
+ val priorityRecord = org.apache.hadoop.yarn.util.Records.newRecord(classOf[Priority])
+ priorityRecord.setPriority(0)
+ val priority = Priority.newInstance(0)
+ new ContainerRequest(capability, null, null, priority)
+ }
+
+ def registerAppMaster(host: String, port: Int, url: String): Unit = {
+ LOG.info(s"Received RegisterAMMessage! $host:$port $url")
+ val response = client.registerApplicationMaster(host, port, url)
+ LOG.info("Received RegisterAppMasterResponse ")
+ reportTo ! AppMasterRegistered
+ }
+
+ def shutdownApplication(): Unit = {
+ LOG.info(s"Shutdown application")
+ client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "success", null)
+ client.stop()
+ }
+
+ def failApplication(ex: Throwable): Unit = {
+ LOG.error(s"Application failed! ", ex)
+ client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, ex.getMessage, null)
+ client.stop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala
new file mode 100644
index 0000000..ca729ce
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, ApplicationSubmissionContext, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource, YarnApplicationState}
+import org.apache.hadoop.yarn.util.{Records => YarnRecords}
+
+import scala.language.implicitConversions
+
+object Records {
+ def newRecord[T](clazz: Class[T]): T = YarnRecords.newRecord(clazz)
+
+ def newAppSubmissionContext: ApplicationSubmissionContext = {
+ YarnRecords.newRecord(classOf[ApplicationSubmissionContext])
+ }
+
+ class ApplicationId(private[glue] val impl: YarnApplicationId) {
+ def getId: Int = impl.getId
+
+ override def toString: String = impl.toString
+
+ override def equals(other: Any): Boolean = {
+ if (other.isInstanceOf[ApplicationId]) {
+ impl.equals(other.asInstanceOf[ApplicationId].impl)
+ } else {
+ false
+ }
+ }
+
+ override def hashCode(): Int = {
+ impl.hashCode()
+ }
+ }
+
+ object ApplicationId {
+ def newInstance(timestamp: Long, id: Int): ApplicationId = {
+ YarnApplicationId.newInstance(timestamp, id)
+ }
+ }
+
+ class ApplicationReport(private[glue] val impl: YarnApplicationReport) {
+ def getApplicationId: ApplicationId = impl.getApplicationId
+
+ def getDiagnostics: String = impl.getDiagnostics
+
+ def getFinishTime: Long = impl.getFinishTime
+
+ def getOriginalTrackingUrl: String = impl.getOriginalTrackingUrl
+
+ def getYarnApplicationState: YarnApplicationState = impl.getYarnApplicationState
+
+ override def toString: String = impl.toString
+ }
+ class Resource(private[glue] val impl: YarnResource) {
+ override def toString: String = impl.toString
+
+ override def equals(other: Any): Boolean = {
+ if (other.isInstanceOf[Resource]) {
+ impl.equals(other.asInstanceOf[Resource].impl)
+ } else {
+ false
+ }
+ }
+
+ override def hashCode(): Int = {
+ impl.hashCode()
+ }
+ }
+
+ object Resource {
+ def newInstance(memory: Int, vCores: Int): Resource = {
+ YarnResource.newInstance(memory, vCores)
+ }
+ }
+
+ class Container(private[glue] val impl: YarnContainer) {
+ def getId: ContainerId = impl.getId
+
+ def getNodeHttpAddress: String = impl.getNodeHttpAddress
+
+ def getNodeId: NodeId = impl.getNodeId
+
+ override def toString: String = impl.toString
+
+ override def equals(other: Any): Boolean = {
+ if (other.isInstanceOf[Container]) {
+ impl.equals(other.asInstanceOf[Container].impl)
+ } else {
+ false
+ }
+ }
+
+ override def hashCode(): Int = {
+ impl.hashCode()
+ }
+ }
+
+ class ContainerId(private[glue] val impl: YarnContainerId) {
+ override def toString: String = impl.toString
+
+ override def equals(other: Any): Boolean = {
+ if (other.isInstanceOf[ContainerId]) {
+ impl.equals(other.asInstanceOf[ContainerId].impl)
+ } else {
+ false
+ }
+ }
+
+ override def hashCode(): Int = {
+ impl.hashCode()
+ }
+ }
+
+ object ContainerId {
+ def fromString(worker: String): ContainerId = YarnContainerId.fromString(worker)
+ }
+
+ class NodeId(private[glue] val impl: YarnNodeId) {
+ def getHost: String = impl.getHost
+
+ override def toString: String = impl.toString
+
+ override def equals(other: Any): Boolean = {
+ if (other.isInstanceOf[NodeId]) {
+ impl.equals(other.asInstanceOf[NodeId].impl)
+ } else {
+ false
+ }
+ }
+
+ override def hashCode(): Int = {
+ impl.hashCode()
+ }
+ }
+
+ class ContainerStatus(private[glue] val impl: YarnContainerStatus) {
+ def getDiagnostics: String = impl.getDiagnostics
+
+ def getContainerId: ContainerId = impl.getContainerId
+
+ def getExitStatus: Int = impl.getExitStatus
+
+ override def toString: String = impl.toString
+ }
+
+ private[glue] implicit def yarnResourceToResource(res: YarnResource): Resource = {
+ new Resource(res)
+ }
+
+ private[glue] implicit def resourceToYarnResource(res: Resource): YarnResource = {
+ res.impl
+ }
+
+ private[glue] implicit def yarnAppIdToAppId(yarn: YarnApplicationId): ApplicationId = {
+ new ApplicationId(yarn)
+ }
+
+ private[glue] implicit def appIdToYarnAppId(app: ApplicationId): YarnApplicationId = {
+ app.impl
+ }
+
+ private[glue] implicit def yarnReportToReport(yarn: YarnApplicationReport): ApplicationReport = {
+ new ApplicationReport(yarn)
+ }
+
+ private[glue] implicit def reportToYarnReport(app: ApplicationReport): YarnApplicationReport = {
+ app.impl
+ }
+
+ private[glue] implicit def yarnContainerToContainer(yarn: YarnContainer): Container = {
+ new Container(yarn)
+ }
+
+ private[glue] implicit def containerToYarnContainer(app: Container): YarnContainer = {
+ app.impl
+ }
+
+ private[glue] implicit def yarnContainerStatusToContainerStatus(yarn: YarnContainerStatus)
+ : ContainerStatus = {
+ new ContainerStatus(yarn)
+ }
+
+ private[glue] implicit def containerStatusToYarnContainerStatus(app: ContainerStatus)
+ : YarnContainerStatus = {
+ app.impl
+ }
+
+ private[glue] implicit def containerIdToYarnContainerId(app: ContainerId): YarnContainerId = {
+ app.impl
+ }
+
+ private[glue] implicit def yarnContainerIdToContainerId(yarn: YarnContainerId): ContainerId = {
+ new ContainerId(yarn)
+ }
+
+ private[glue] implicit def nodeIdToYarnNodeId(app: NodeId): YarnNodeId = {
+ app.impl
+ }
+
+ private[glue] implicit def yarnNodeIdToNodeId(yarn: YarnNodeId): NodeId = {
+ new NodeId(yarn)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala
new file mode 100644
index 0000000..634dd0e
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import org.apache.gearpump.experiments.yarn.glue.Records._
+import org.apache.gearpump.util.LogUtil
+import org.apache.hadoop.yarn.client.api
+
+/**
+ * Adapter for api.YarnClient
+ */
+class YarnClient(yarn: YarnConfig) {
+
+ val LOG = LogUtil.getLogger(getClass)
+
+ private val client: api.YarnClient = api.YarnClient.createYarnClient
+ client.init(yarn.conf)
+ client.start()
+ LOG.info("Starting YarnClient...")
+
+ def createApplication: ApplicationId = {
+ val app = client.createApplication()
+ val response = app.getNewApplicationResponse()
+ LOG.info("Create application, appId: " + response.getApplicationId())
+ response.getApplicationId()
+ }
+
+ def getApplicationReport(appId: ApplicationId): ApplicationReport = {
+ client.getApplicationReport(appId)
+ }
+
+ def submit(
+ name: String, appId: ApplicationId, command: String, resource: Resource, queue: String,
+ packagePath: String, configPath: String): ApplicationId = {
+
+ val appContext = Records.newAppSubmissionContext
+ appContext.setApplicationName(name)
+ appContext.setApplicationId(appId)
+
+ val containerContext = ContainerLaunchContext(yarn.conf, command, packagePath, configPath)
+ appContext.setAMContainerSpec(containerContext)
+ appContext.setResource(resource)
+ appContext.setQueue(queue)
+
+ LOG.info(s"Submit Application $appId to YARN...")
+ client.submitApplication(appContext)
+ }
+
+ def awaitApplication(appId: ApplicationId, timeoutMilliseconds: Long = Long.MaxValue)
+ : ApplicationReport = {
+ import org.apache.hadoop.yarn.api.records.YarnApplicationState._
+ val terminated = Set(FINISHED, KILLED, FAILED, RUNNING)
+ var result: ApplicationReport = null
+ var done = false
+
+ val start = System.currentTimeMillis()
+ def timeout: Boolean = {
+ val now = System.currentTimeMillis()
+ if (now - start > timeoutMilliseconds) {
+ true
+ } else {
+ false
+ }
+ }
+
+ while (!done && !timeout) {
+ val report = client.getApplicationReport(appId)
+ val status = report.getYarnApplicationState
+ if (terminated.contains(status)) {
+ done = true
+ result = report
+ } else {
+ Console.print(".")
+ Thread.sleep(1000)
+ }
+ }
+
+ if (timeout) {
+ throw new Exception(s"Launch Application $appId timeout...")
+ }
+ result
+ }
+
+ def stop(): Unit = {
+ client.stop()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala
new file mode 100644
index 0000000..4b927de
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+/**
+ * wrapper for yarn configuration
+ */
+case class YarnConfig(conf: YarnConfiguration = new YarnConfiguration(new Configuration(true))) {
+ def writeXml(out: java.io.Writer): Unit = conf.writeXml(out)
+
+ def resourceManager: String = {
+ conf.get("yarn.resourcemanager.hostname")
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala
new file mode 100644
index 0000000..33e5778
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn
+
+/**
+ * YARN facade to decouple Gearpump with YARN.
+ */
+package object glue {
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
deleted file mode 100644
index 2a6bf38..0000000
--- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.transport.HostPort
-
-class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
- val config = ConfigFactory.parseString(
-
- """
- |
- |gearpump {
- | yarn {
- | client {
- | package -path = "/user/gearpump/gearpump.zip"
- | }
- |
- | applicationmaster {
- | ## Memory of YarnAppMaster
- | command = "$JAVA_HOME/bin/java -Xmx512m"
- | memory = "512"
- | vcores = "1"
- | queue = "default"
- | }
- |
- | master {
- | ## Memory of master daemon
- | command = "$JAVA_HOME/bin/java -Xmx512m"
- | memory = "512"
- | vcores = "1"
- | }
- |
- | worker {
- | ## memory of worker daemon
- | command = "$JAVA_HOME/bin/java -Xmx512m"
- | containers = "4"
- | ## This also contains all memory for child executors.
- | memory = "4096"
- | vcores = "1"
- | }
- | services {
- | enabled = true
- | }
- | }
- |}
- """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
-
- "MasterCommand" should "create correct command line" in {
- val version = "gearpump-0.1"
- val master = MasterCommand(config, version, HostPort("127.0.0.1", 8080))
-
- // scalastyle:off line.size.limit
- val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 -Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> io.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
- // scalastyle:on line.size.limit
- assert(master.get == expected)
- }
-
- "WorkerCommand" should "create correct command line" in {
- val version = "gearpump-0.1"
- val worker = WorkerCommand(config, version, HostPort("127.0.0.1", 8080), "worker-machine")
- // scalastyle:off line.size.limit
- val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine io.gearpump.cluster.main.Worker 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
- // scalastyle:on line.size.limit
- assert(worker.get == expected)
- }
-
- "AppMasterCommand" should "create correct command line" in {
- val version = "gearpump-0.1"
- val appmaster = AppMasterCommand(config, version, Array("arg1", "arg2", "arg3"))
- // scalastyle:off line.size.limit
- val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.binary-version-with-scala-version=gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} io.gearpump.experiments.yarn.appmaster.YarnAppMaster arg1 arg2 arg3 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
- // scalastyle:on line.size.limit
- assert(appmaster.get == expected)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
deleted file mode 100644
index f8f9fe8..0000000
--- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.experiments.yarn.appmaster.UIServiceSpec.{Info, MockUI}
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Constants
-
-class UIServiceSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
- implicit var system: ActorSystem = null
-
- override def beforeAll(): Unit = {
- system = ActorSystem(getClass.getSimpleName, TestUtil.DEFAULT_CONFIG)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- it should "start UI server correctly" in {
- val probe = TestProbe()
- val masters = List(
- HostPort("127.0.0.1", 3000),
- HostPort("127.0.0.1", 3001),
- HostPort("127.0.0.1", 3002)
- )
- val host = "local"
- val port = 8091
-
- val ui = system.actorOf(Props(new MockUI(masters, host, port, probe.ref)))
-
- probe.expectMsgPF() {
- case info: Info => {
- assert(info.masterHost == "127.0.0.1")
- assert(info.masterPort == 3000)
- val conf = ConfigFactory.parseFile(new java.io.File(info.configFile))
- assert(conf.getString(Constants.GEARPUMP_SERVICE_HOST) == host)
- assert(conf.getString(Constants.GEARPUMP_SERVICE_HTTP) == "8091")
- assert(conf.getString(Constants.NETTY_TCP_HOSTNAME) == host)
- }
- }
-
- system.stop(ui)
- }
-}
-
-object UIServiceSpec {
-
- case class Info(supervisor: String, masterHost: String, masterPort: Int, configFile: String)
-
- class MockUI(masters: List[HostPort], host: String, port: Int, probe: ActorRef)
- extends UIService(masters, host, port) {
-
- override def launch(
- supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = {
- probe ! Info(supervisor, masterHost, masterPort, configFile)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
deleted file mode 100644
index 84d6d37..0000000
--- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.testkit.{TestActorRef, TestProbe}
-import com.typesafe.config.ConfigFactory
-import org.mockito.ArgumentCaptor
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.experiments.yarn.Constants
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, AppMasterRegistered, ClusterInfo, ContainerStarted, ContainersAllocated, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, ResourceManagerException, Version}
-import io.gearpump.experiments.yarn.appmaster.YarnAppMasterSpec.UI
-import io.gearpump.experiments.yarn.glue.Records.{Container, Resource, _}
-import io.gearpump.experiments.yarn.glue.{NMClient, RMClient}
-import io.gearpump.transport.HostPort
-
-class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
- val config = ConfigFactory.parseString(
-
- """
- |gearpump {
- | yarn {
- | client {
- | package -path = "/user/gearpump/gearpump.zip"
- | }
- |
- | applicationmaster {
- | ## Memory of YarnAppMaster
- | command = "$JAVA_HOME/bin/java -Xmx512m"
- | memory = "512"
- | vcores = "1"
- | queue = "default"
- | }
- |
- | master {
- | ## Memory of master daemon
- | command = "$JAVA_HOME/bin/java -Xmx512m"
- | memory = "512"
- | vcores = "1"
- | }
- |
- | worker {
- | ## memory of worker daemon
- | command = "$JAVA_HOME/bin/java -Xmx512m"
- | containers = "4"
- | ## This also contains all memory for child executors.
- | memory = "4096"
- | vcores = "1"
- | }
- | services {
- | enabled = true
- | }
- | }
- |}
- """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
-
- val masterCount = 1
- val workerCount = config.getString(Constants.WORKER_CONTAINERS).toInt
-
- implicit var system: ActorSystem = null
- val packagePath = "/user/gearpump/gearpump.zip"
- val configPath = "/user/my/conf"
-
- override def beforeAll(): Unit = {
- system = ActorSystem("test", config)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- private def startAppMaster(): (TestActorRef[YarnAppMaster], TestProbe, NMClient, RMClient) = {
- val rmClient = mock(classOf[RMClient])
- val nmClient = mock(classOf[NMClient])
- val ui = mock(classOf[UIFactory])
- when(ui.props(any[List[HostPort]], anyString, anyInt)).thenReturn(Props(new UI))
-
- val appMaster = TestActorRef[YarnAppMaster](Props(
- new YarnAppMaster(rmClient, nmClient, packagePath, configPath, ui)))
-
- verify(rmClient).start(appMaster)
- verify(nmClient).start(appMaster)
- verify(rmClient).registerAppMaster(anyString, anyInt, anyString)
-
- appMaster ! AppMasterRegistered
-
- val masterResources = ArgumentCaptor.forClass(classOf[List[Resource]])
- verify(rmClient).requestContainers(masterResources.capture())
- assert(masterResources.getValue.size == masterCount)
-
- val masterContainer = mock(classOf[Container])
- val mockNode = mock(classOf[NodeId])
- val mockId = mock(classOf[ContainerId])
- when(masterContainer.getNodeId).thenReturn(mockNode)
- when(masterContainer.getId).thenReturn(mockId)
-
- // Launchs master
- appMaster ! ContainersAllocated(List.fill(masterCount)(masterContainer))
- verify(nmClient,
- times(masterCount)).launchCommand(any[Container], anyString, anyString, anyString)
-
- // Master containers started
- (0 until masterCount).foreach(_ => appMaster ! ContainerStarted(mockId))
-
- // Transition to start workers
- val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
- verify(rmClient, times(2)).requestContainers(workerResources.capture())
- assert(workerResources.getValue.size == workerCount)
-
- // Launchs workers
- val workerContainer = mock(classOf[Container])
- when(workerContainer.getNodeId).thenReturn(mockNode)
- val workerContainerId = ContainerId.fromString("container_1449802454214_0034_01_000006")
- when(workerContainer.getId).thenReturn(workerContainerId)
- appMaster ! ContainersAllocated(List.fill(workerCount)(workerContainer))
- verify(nmClient, times(workerCount + masterCount))
- .launchCommand(any[Container], anyString, anyString, anyString)
-
- // Worker containers started
- (0 until workerCount).foreach(_ => appMaster ! ContainerStarted(mockId))
-
- // Starts UI server
- verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt)
-
- // Application Ready...
- val client = TestProbe()
-
- // Gets active config
- appMaster.tell(GetActiveConfig("client"), client.ref)
- client.expectMsgType[ActiveConfig]
-
- // Queries version
- appMaster.tell(QueryVersion, client.ref)
- client.expectMsgType[Version]
-
- // Queries version
- appMaster.tell(QueryClusterInfo, client.ref)
- client.expectMsgType[ClusterInfo]
-
- // Adds worker
- val newWorkerCount = 2
- appMaster.tell(AddWorker(newWorkerCount), client.ref)
- client.expectMsgType[CommandResult]
- val newWorkerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
- verify(rmClient, times(3)).requestContainers(newWorkerResources.capture())
- assert(newWorkerResources.getValue.size == newWorkerCount)
-
- // New container allocated
- appMaster ! ContainersAllocated(List.fill(newWorkerCount)(workerContainer))
- verify(nmClient, times(workerCount + masterCount + newWorkerCount)).
- launchCommand(any[Container], anyString, anyString, anyString)
-
- // New worker containers started
- (0 until newWorkerCount).foreach(_ => appMaster ! ContainerStarted(mockId))
-
- // Same UI server
- verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt)
-
- // Removes worker
- appMaster.tell(RemoveWorker(workerContainerId.toString), client.ref)
- client.expectMsgType[CommandResult]
- verify(nmClient).stopContainer(any[ContainerId], any[NodeId])
-
- (appMaster, client: TestProbe, nmClient, rmClient)
- }
-
- it should "start master, worker and UI on YARN" in {
- val env = startAppMaster()
- val (appMaster, client, nmClient, rmClient) = env
-
- // Kills the app
- appMaster.tell(Kill, client.ref)
- client.expectMsgType[CommandResult]
- verify(nmClient, times(1)).stop()
- verify(rmClient, times(1)).shutdownApplication()
- }
-
- it should "handle resource manager errors" in {
- val env = startAppMaster()
- val (appMaster, client, nmClient, rmClient) = env
-
- // on error
- val ex = new Exception("expected resource manager exception")
- appMaster.tell(ResourceManagerException(ex), client.ref)
- verify(nmClient, times(1)).stop()
- verify(rmClient, times(1)).failApplication(ex)
- }
-}
-
-object YarnAppMasterSpec {
-
- class UI extends Actor {
- def receive: Receive = null
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
deleted file mode 100644
index 3bd7f4f..0000000
--- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.client
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
-import java.util.Random
-import java.util.zip.{ZipEntry, ZipOutputStream}
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.util.Try
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
-import io.gearpump.util.FileUtils
-class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
- implicit var system: ActorSystem = null
-
- val rand = new Random()
-
- private def randomArray(size: Int): Array[Byte] = {
- val array = new Array[Byte](size)
- rand.nextBytes(array)
- array
- }
- val appId = ApplicationId.newInstance(0L, 0)
-
- val akka = ConfigFactory.parseString(
-
- """
- |gearpump {
- | yarn {
- | client {
- | package -path = "/user/gearpump/gearpump.zip"
- | }
- |
- | applicationmaster {
- | ## Memory of YarnAppMaster
- | command = "$JAVA_HOME/bin/java -Xmx512m"
- | memory = "512"
- | vcores = "1"
- | queue = "default"
- | }
- |
- | master {
- | ## Memory of master daemon
- | command = "$JAVA_HOME/bin/java -Xmx512m"
- | containers = "2"
- | memory = "512"
- | vcores = "1"
- | }
- |
- | worker {
- | ## memory of worker daemon
- | command = "$JAVA_HOME/bin/java -Xmx512m"
- | containers = "4"
- | ## This also contains all memory for child executors.
- | memory = "4096"
- | vcores = "1"
- | }
- | services {
- | enabled = true
- | }
- | }
- |}
- """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
-
- override def beforeAll(): Unit = {
- system = ActorSystem(getClass.getSimpleName, akka)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- it should "reject non-zip files" in {
- val yarnConfig = mock(classOf[YarnConfig])
- val yarnClient = mock(classOf[YarnClient])
- val fs = mock(classOf[FileSystem])
- val appMasterResolver = mock(classOf[AppMasterResolver])
-
- val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
- val packagePath = "gearpump.zip2"
- assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
- }
-
- it should "reject if we cannot find the package file on HDFS" in {
- val yarnConfig = mock(classOf[YarnConfig])
- val yarnClient = mock(classOf[YarnClient])
- val fs = mock(classOf[FileSystem])
- val appMasterResolver = mock(classOf[AppMasterResolver])
-
- val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
- val packagePath = "gearpump.zip"
- when(fs.exists(anyString)).thenReturn(false)
- assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
- }
-
- it should "throw when package exists on HDFS, but the file is corrupted" in {
- val yarnConfig = mock(classOf[YarnConfig])
- val yarnClient = mock(classOf[YarnClient])
- val fs = mock(classOf[FileSystem])
- val appMasterResolver = mock(classOf[AppMasterResolver])
-
- val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
- val packagePath = "gearpump.zip"
- when(fs.exists(anyString)).thenReturn(true)
-
- val content = new ByteArrayInputStream(randomArray(10))
- when(fs.open(anyString)).thenReturn(content)
- assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
- content.close()
- }
-
- it should "throw when the HDFS package version is not consistent with local version" in {
- val yarnConfig = mock(classOf[YarnConfig])
- val yarnClient = mock(classOf[YarnClient])
- val fs = mock(classOf[FileSystem])
- val appMasterResolver = mock(classOf[AppMasterResolver])
-
- val version = "gearpump-0.2"
- val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
- appMasterResolver, version)
- val packagePath = "gearpump.zip"
- when(fs.exists(anyString)).thenReturn(true)
-
- val oldVesion = "gearpump-0.1"
- when(fs.open(anyString)).thenReturn(zipInputStream(oldVesion))
- assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
- }
-
- it should "upload config file to HDFS when submitting" in {
- val yarnConfig = mock(classOf[YarnConfig])
- val yarnClient = mock(classOf[YarnClient])
- val fs = mock(classOf[FileSystem])
- val appMasterResolver = mock(classOf[AppMasterResolver])
-
- val version = "gearpump-0.2"
- val launcher = new LaunchCluster(akka, yarnConfig, yarnClient,
- fs, system, appMasterResolver, version)
- val packagePath = "gearpump.zip"
-
- val out = mock(classOf[OutputStream])
- when(fs.exists(anyString)).thenReturn(true)
- when(fs.create(anyString)).thenReturn(out)
- when(fs.getHomeDirectory).thenReturn("/root")
-
- when(fs.open(anyString)).thenReturn(zipInputStream(version))
-
- val report = mock(classOf[ApplicationReport])
- when(yarnClient.awaitApplication(any[ApplicationId], anyLong())).thenReturn(report)
-
- when(report.getApplicationId).thenReturn(appId)
- when(yarnClient.createApplication).thenReturn(appId)
- assert(appId == launcher.submit("gearpump", packagePath))
-
- // 3 Config files are uploaded to HDFS, one is akka.conf,
- // one is yarn-site.xml, one is log4j.properties.
- verify(fs, times(3)).create(anyString)
- verify(out, times(3)).close()
-
- // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
- // scalastyle:off line.size.limit
- val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} io.gearpump.experiments.yarn.appmaster.YarnAppMaster -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
- // scalastyle:on line.size.limit
- verify(yarnClient).submit("gearpump", appId, expectedCommand,
- Resource.newInstance(512, 1), "default",
- "gearpump.zip", "/root/.gearpump_application_0_0000/conf/")
- }
-
- it should "save active config from Gearpump cluster" in {
- val yarnConfig = mock(classOf[YarnConfig])
- val yarnClient = mock(classOf[YarnClient])
- val fs = mock(classOf[FileSystem])
- val appMasterResolver = mock(classOf[AppMasterResolver])
- val appMaster = TestProbe()
-
- val version = "gearpump-0.2"
- val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
- appMasterResolver, version)
- val outputPath = java.io.File.createTempFile("LaunchClusterSpec", ".conf")
-
- when(appMasterResolver.resolve(any[ApplicationId], anyInt)).thenReturn(appMaster.ref)
- val fileFuture = launcher.saveConfig(appId, outputPath.getPath)
- appMaster.expectMsgType[GetActiveConfig]
- appMaster.reply(ActiveConfig(ConfigFactory.empty()))
-
- import scala.concurrent.duration._
- val file = Await.result(fileFuture, 30.seconds).asInstanceOf[java.io.File]
-
- assert(!FileUtils.read(file).isEmpty)
- file.delete()
- }
-
- private def zipInputStream(version: String): InputStream = {
- val bytes = new ByteArrayOutputStream(1000)
- val zipOut = new ZipOutputStream(bytes)
-
- // Not available on BufferedOutputStream
- zipOut.putNextEntry(new ZipEntry(s"$version/README.md"))
- zipOut.write("README".getBytes())
- // Not available on BufferedOutputStream
- zipOut.closeEntry()
- zipOut.close()
- new ByteArrayInputStream(bytes.toByteArray)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
deleted file mode 100644
index b324ece..0000000
--- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.client
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.main.ParseResult
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version}
-import io.gearpump.experiments.yarn.client.ManageCluster._
-import io.gearpump.experiments.yarn.glue.Records.ApplicationId
-import io.gearpump.util.FileUtils
-
-class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
- implicit var system: ActorSystem = null
-
- override def beforeAll(): Unit = {
- system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- }
-
- override def afterAll(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- it should "getConfig from remote Gearpump" in {
- val appId = ApplicationId.newInstance(0L, 0)
- val appMaster = TestProbe()
- val manager = new ManageCluster(appId, appMaster.ref, system)
-
- val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
-
- val future = manager.command(GET_CONFIG, new ParseResult(Map("output" -> output.toString),
- Array.empty[String]))
- appMaster.expectMsgType[GetActiveConfig]
- appMaster.reply(ActiveConfig(ConfigFactory.empty()))
- import scala.concurrent.duration._
- Await.result(future, 30.seconds)
-
- val content = FileUtils.read(output)
- assert(content.length > 0)
- output.delete()
- }
-
- it should "addworker" in {
- val appId = ApplicationId.newInstance(0L, 0)
- val appMaster = TestProbe()
- val manager = new ManageCluster(appId, appMaster.ref, system)
-
- val future = manager.command(ADD_WORKER, new ParseResult(Map("count" -> 1.toString),
- Array.empty[String]))
- appMaster.expectMsg(AddWorker(1))
- appMaster.reply(CommandResult(true))
- import scala.concurrent.duration._
- val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
- assert(result.success)
- }
-
- it should "removeworker" in {
- val appId = ApplicationId.newInstance(0L, 0)
- val appMaster = TestProbe()
- val manager = new ManageCluster(appId, appMaster.ref, system)
-
- val future = manager.command(REMOVE_WORKER, new ParseResult(Map("container" -> "1"),
- Array.empty[String]))
- appMaster.expectMsg(RemoveWorker("1"))
- appMaster.reply(CommandResult(true))
- import scala.concurrent.duration._
- val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
- assert(result.success)
- }
-
- it should "get version" in {
- val appId = ApplicationId.newInstance(0L, 0)
- val appMaster = TestProbe()
- val manager = new ManageCluster(appId, appMaster.ref, system)
- val future = manager.command(VERSION, new ParseResult(Map("container" -> "1"),
- Array.empty[String]))
- appMaster.expectMsg(QueryVersion)
- appMaster.reply(Version("version 0.1"))
- import scala.concurrent.duration._
- val result = Await.result(future, 30.seconds).asInstanceOf[Version]
- assert(result.version == "version 0.1")
- }
-
- it should "get cluster info" in {
- val appId = ApplicationId.newInstance(0L, 0)
- val appMaster = TestProbe()
- val manager = new ManageCluster(appId, appMaster.ref, system)
-
- val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
-
- val future = manager.command(QUERY, new ParseResult(Map.empty[String, String],
- Array.empty[String]))
- appMaster.expectMsg(QueryClusterInfo)
- appMaster.reply(ClusterInfo(List("master"), List("worker")))
- import scala.concurrent.duration._
- val result = Await.result(future, 30.seconds).asInstanceOf[ClusterInfo]
- assert(result.masters.sameElements(List("master")))
- assert(result.workers.sameElements(List("worker")))
- }
-
- it should "kill the cluster" in {
- val appId = ApplicationId.newInstance(0L, 0)
- val appMaster = TestProbe()
- val manager = new ManageCluster(appId, appMaster.ref, system)
-
- val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
-
- val future = manager.command(KILL, new ParseResult(Map("container" -> "1"),
- Array.empty[String]))
- appMaster.expectMsg(Kill)
- appMaster.reply(CommandResult(true))
- import scala.concurrent.duration._
- val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
- assert(result.success)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala
new file mode 100644
index 0000000..c4c5a65
--- /dev/null
+++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.transport.HostPort
+
+class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+ val config = ConfigFactory.parseString(
+
+ """
+ |
+ |gearpump {
+ | yarn {
+ | client {
+ | package -path = "/user/gearpump/gearpump.zip"
+ | }
+ |
+ | applicationmaster {
+ | ## Memory of YarnAppMaster
+ | command = "$JAVA_HOME/bin/java -Xmx512m"
+ | memory = "512"
+ | vcores = "1"
+ | queue = "default"
+ | }
+ |
+ | master {
+ | ## Memory of master daemon
+ | command = "$JAVA_HOME/bin/java -Xmx512m"
+ | memory = "512"
+ | vcores = "1"
+ | }
+ |
+ | worker {
+ | ## memory of worker daemon
+ | command = "$JAVA_HOME/bin/java -Xmx512m"
+ | containers = "4"
+ | ## This also contains all memory for child executors.
+ | memory = "4096"
+ | vcores = "1"
+ | }
+ | services {
+ | enabled = true
+ | }
+ | }
+ |}
+ """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
+
+ "MasterCommand" should "create correct command line" in {
+ val version = "gearpump-0.1"
+ val master = MasterCommand(config, version, HostPort("127.0.0.1", 8080))
+
+ // scalastyle:off line.size.limit
+ val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 -Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> org.apache.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
+ // scalastyle:on line.size.limit
+ assert(master.get == expected)
+ }
+
+ "WorkerCommand" should "create correct command line" in {
+ val version = "gearpump-0.1"
+ val worker = WorkerCommand(config, version, HostPort("127.0.0.1", 8080), "worker-machine")
+ // scalastyle:off line.size.limit
+ val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine org.apache.gearpump.cluster.main.Worker 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
+ // scalastyle:on line.size.limit
+ assert(worker.get == expected)
+ }
+
+ "AppMasterCommand" should "create correct command line" in {
+ val version = "gearpump-0.1"
+ val appmaster = AppMasterCommand(config, version, Array("arg1", "arg2", "arg3"))
+ // scalastyle:off line.size.limit
+ val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.binary-version-with-scala-version=gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster arg1 arg2 arg3 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
+ // scalastyle:on line.size.limit
+ assert(appmaster.get == expected)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
new file mode 100644
index 0000000..34af145
--- /dev/null
+++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.experiments.yarn.appmaster.UIServiceSpec.{Info, MockUI}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class UIServiceSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+ implicit var system: ActorSystem = null
+
+ override def beforeAll(): Unit = {
+ system = ActorSystem(getClass.getSimpleName, TestUtil.DEFAULT_CONFIG)
+ }
+
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ it should "start UI server correctly" in {
+ val probe = TestProbe()
+ val masters = List(
+ HostPort("127.0.0.1", 3000),
+ HostPort("127.0.0.1", 3001),
+ HostPort("127.0.0.1", 3002)
+ )
+ val host = "local"
+ val port = 8091
+
+ val ui = system.actorOf(Props(new MockUI(masters, host, port, probe.ref)))
+
+ probe.expectMsgPF() {
+ case info: Info => {
+ assert(info.masterHost == "127.0.0.1")
+ assert(info.masterPort == 3000)
+ val conf = ConfigFactory.parseFile(new java.io.File(info.configFile))
+ assert(conf.getString(Constants.GEARPUMP_SERVICE_HOST) == host)
+ assert(conf.getString(Constants.GEARPUMP_SERVICE_HTTP) == "8091")
+ assert(conf.getString(Constants.NETTY_TCP_HOSTNAME) == host)
+ }
+ }
+
+ system.stop(ui)
+ }
+}
+
+object UIServiceSpec {
+
+ case class Info(supervisor: String, masterHost: String, masterPort: Int, configFile: String)
+
+ class MockUI(masters: List[HostPort], host: String, port: Int, probe: ActorRef)
+ extends UIService(masters, host, port) {
+
+ override def launch(
+ supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = {
+ probe ! Info(supervisor, masterHost, masterPort, configFile)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
new file mode 100644
index 0000000..055e822
--- /dev/null
+++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.testkit.{TestActorRef, TestProbe}
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.experiments.yarn.Constants
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, AppMasterRegistered, ClusterInfo, ContainerStarted, ContainersAllocated, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, ResourceManagerException, Version}
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMasterSpec.UI
+import org.apache.gearpump.experiments.yarn.glue.Records.{Container, Resource, _}
+import org.apache.gearpump.experiments.yarn.glue.{NMClient, RMClient}
+import org.apache.gearpump.transport.HostPort
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+ val config = ConfigFactory.parseString(
+
+ """
+ |gearpump {
+ | yarn {
+ | client {
+ | package -path = "/user/gearpump/gearpump.zip"
+ | }
+ |
+ | applicationmaster {
+ | ## Memory of YarnAppMaster
+ | command = "$JAVA_HOME/bin/java -Xmx512m"
+ | memory = "512"
+ | vcores = "1"
+ | queue = "default"
+ | }
+ |
+ | master {
+ | ## Memory of master daemon
+ | command = "$JAVA_HOME/bin/java -Xmx512m"
+ | memory = "512"
+ | vcores = "1"
+ | }
+ |
+ | worker {
+ | ## memory of worker daemon
+ | command = "$JAVA_HOME/bin/java -Xmx512m"
+ | containers = "4"
+ | ## This also contains all memory for child executors.
+ | memory = "4096"
+ | vcores = "1"
+ | }
+ | services {
+ | enabled = true
+ | }
+ | }
+ |}
+ """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
+
+ val masterCount = 1
+ val workerCount = config.getString(Constants.WORKER_CONTAINERS).toInt
+
+ implicit var system: ActorSystem = null
+ val packagePath = "/user/gearpump/gearpump.zip"
+ val configPath = "/user/my/conf"
+
+ override def beforeAll(): Unit = {
+ system = ActorSystem("test", config)
+ }
+
+ override def afterAll(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ private def startAppMaster(): (TestActorRef[YarnAppMaster], TestProbe, NMClient, RMClient) = {
+ val rmClient = mock(classOf[RMClient])
+ val nmClient = mock(classOf[NMClient])
+ val ui = mock(classOf[UIFactory])
+ when(ui.props(any[List[HostPort]], anyString, anyInt)).thenReturn(Props(new UI))
+
+ val appMaster = TestActorRef[YarnAppMaster](Props(
+ new YarnAppMaster(rmClient, nmClient, packagePath, configPath, ui)))
+
+ verify(rmClient).start(appMaster)
+ verify(nmClient).start(appMaster)
+ verify(rmClient).registerAppMaster(anyString, anyInt, anyString)
+
+ appMaster ! AppMasterRegistered
+
+ val masterResources = ArgumentCaptor.forClass(classOf[List[Resource]])
+ verify(rmClient).requestContainers(masterResources.capture())
+ assert(masterResources.getValue.size == masterCount)
+
+ val masterContainer = mock(classOf[Container])
+ val mockNode = mock(classOf[NodeId])
+ val mockId = mock(classOf[ContainerId])
+ when(masterContainer.getNodeId).thenReturn(mockNode)
+ when(masterContainer.getId).thenReturn(mockId)
+
+ // Launchs master
+ appMaster ! ContainersAllocated(List.fill(masterCount)(masterContainer))
+ verify(nmClient,
+ times(masterCount)).launchCommand(any[Container], anyString, anyString, anyString)
+
+ // Master containers started
+ (0 until masterCount).foreach(_ => appMaster ! ContainerStarted(mockId))
+
+ // Transition to start workers
+ val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
+ verify(rmClient, times(2)).requestContainers(workerResources.capture())
+ assert(workerResources.getValue.size == workerCount)
+
+ // Launchs workers
+ val workerContainer = mock(classOf[Container])
+ when(workerContainer.getNodeId).thenReturn(mockNode)
+ val workerContainerId = ContainerId.fromString("container_1449802454214_0034_01_000006")
+ when(workerContainer.getId).thenReturn(workerContainerId)
+ appMaster ! ContainersAllocated(List.fill(workerCount)(workerContainer))
+ verify(nmClient, times(workerCount + masterCount))
+ .launchCommand(any[Container], anyString, anyString, anyString)
+
+ // Worker containers started
+ (0 until workerCount).foreach(_ => appMaster ! ContainerStarted(mockId))
+
+ // Starts UI server
+ verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt)
+
+ // Application Ready...
+ val client = TestProbe()
+
+ // Gets active config
+ appMaster.tell(GetActiveConfig("client"), client.ref)
+ client.expectMsgType[ActiveConfig]
+
+ // Queries version
+ appMaster.tell(QueryVersion, client.ref)
+ client.expectMsgType[Version]
+
+ // Queries version
+ appMaster.tell(QueryClusterInfo, client.ref)
+ client.expectMsgType[ClusterInfo]
+
+ // Adds worker
+ val newWorkerCount = 2
+ appMaster.tell(AddWorker(newWorkerCount), client.ref)
+ client.expectMsgType[CommandResult]
+ val newWorkerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
+ verify(rmClient, times(3)).requestContainers(newWorkerResources.capture())
+ assert(newWorkerResources.getValue.size == newWorkerCount)
+
+ // New container allocated
+ appMaster ! ContainersAllocated(List.fill(newWorkerCount)(workerContainer))
+ verify(nmClient, times(workerCount + masterCount + newWorkerCount)).
+ launchCommand(any[Container], anyString, anyString, anyString)
+
+ // New worker containers started
+ (0 until newWorkerCount).foreach(_ => appMaster ! ContainerStarted(mockId))
+
+ // Same UI server
+ verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt)
+
+ // Removes worker
+ appMaster.tell(RemoveWorker(workerContainerId.toString), client.ref)
+ client.expectMsgType[CommandResult]
+ verify(nmClient).stopContainer(any[ContainerId], any[NodeId])
+
+ (appMaster, client: TestProbe, nmClient, rmClient)
+ }
+
+ it should "start master, worker and UI on YARN" in {
+ val env = startAppMaster()
+ val (appMaster, client, nmClient, rmClient) = env
+
+ // Kills the app
+ appMaster.tell(Kill, client.ref)
+ client.expectMsgType[CommandResult]
+ verify(nmClient, times(1)).stop()
+ verify(rmClient, times(1)).shutdownApplication()
+ }
+
+ it should "handle resource manager errors" in {
+ val env = startAppMaster()
+ val (appMaster, client, nmClient, rmClient) = env
+
+ // on error
+ val ex = new Exception("expected resource manager exception")
+ appMaster.tell(ResourceManagerException(ex), client.ref)
+ verify(nmClient, times(1)).stop()
+ verify(rmClient, times(1)).failApplication(ex)
+ }
+}
+
+object YarnAppMasterSpec {
+
+ class UI extends Actor {
+ def receive: Receive = null
+ }
+}
\ No newline at end of file