You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:31 UTC

[21/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
new file mode 100644
index 0000000..7e5d9ec
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import java.io.File
+import java.nio.ByteBuffer
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileSystem => YarnFileSystem, Path}
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.mapreduce.security.TokenCache
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.LogUtil
+
+private[glue]
+object ContainerLaunchContext {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String)
+    : ContainerLaunchContext = {
+    val context = Records.newRecord(classOf[ContainerLaunchContext])
+    context.setCommands(Seq(command).asJava)
+    context.setEnvironment(getAppEnv(yarnConf).asJava)
+    context.setTokens(getToken(yarnConf, packagePath, configPath))
+    context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath).asJava)
+    context
+  }
+
+  private def getFs(yarnConf: YarnConfiguration) = YarnFileSystem.get(yarnConf)
+
+  private def getAppEnv(yarnConf: YarnConfiguration): Map[String, String] = {
+    val classPaths = yarnConf.getStrings(
+      YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+      YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.mkString(File.pathSeparator))
+    val allPaths = Option(classPaths).getOrElse(Array(""))
+
+    allPaths :+ Environment.PWD.$() + File.separator + "*" + File.pathSeparator
+
+    Map(Environment.CLASSPATH.name -> allPaths.map(_.trim).mkString(File.pathSeparator))
+  }
+
+  private def getAMLocalResourcesMap(
+      yarnConf: YarnConfiguration, packagePath: String, configPath: String)
+    : Map[String, LocalResource] = {
+    val fs = getFs(yarnConf)
+
+    Map(
+      "pack" -> newYarnAppResource(fs, new Path(packagePath),
+        LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION),
+      "conf" -> newYarnAppResource(fs, new Path(configPath),
+        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION))
+  }
+
+  private def newYarnAppResource(
+      fs: YarnFileSystem, path: Path,
+      resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = {
+    val qualified = fs.makeQualified(path)
+    val status = fs.getFileStatus(qualified)
+    val resource = Records.newRecord(classOf[LocalResource])
+    resource.setType(resourceType)
+    resource.setVisibility(vis)
+    resource.setResource(ConverterUtils.getYarnUrlFromPath(qualified))
+    resource.setTimestamp(status.getModificationTime)
+    resource.setSize(status.getLen)
+    resource
+  }
+
+  private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String)
+    : ByteBuffer = {
+    val credentials = UserGroupInformation.getCurrentUser.getCredentials
+    val dob = new DataOutputBuffer
+    val dirs = Array(new Path(packagePath), new Path(configPath))
+    TokenCache.obtainTokensForNamenodes(credentials, dirs, yc)
+    credentials.writeTokenStorageToStream(dob)
+    ByteBuffer.wrap(dob.getData)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala
new file mode 100644
index 0000000..dcb53e9
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import java.io.{InputStream, OutputStream}
+import java.net.ConnectException
+
+import org.apache.gearpump.util.LogUtil
+import org.apache.hadoop.fs.Path
+
+import scala.util.{Failure, Success, Try}
+
+class FileSystem(yarnConfig: YarnConfig) {
+
+  private val conf = yarnConfig.conf
+  private val fs = org.apache.hadoop.fs.FileSystem.get(conf)
+
+  private def LOG = LogUtil.getLogger(getClass)
+
+  def open(file: String): InputStream = exceptionHandler {
+    val path = new Path(file)
+    fs.open(path)
+  }
+
+  def create(file: String): OutputStream = exceptionHandler {
+    val path = new Path(file)
+    fs.create(path)
+  }
+
+  def exists(file: String): Boolean = exceptionHandler {
+    val path = new Path(file)
+    fs.exists(path)
+  }
+
+  def name: String = {
+    fs.getUri.toString
+  }
+
+  def getHomeDirectory: String = {
+    fs.getHomeDirectory.toString
+  }
+
+  private def exceptionHandler[T](call: => T): T = {
+    val callTry = Try(call)
+    callTry match {
+      case Success(v) => v
+      case Failure(ex) =>
+        if (ex.isInstanceOf[ConnectException]) {
+          LOG.error("Please check whether we connect to the right HDFS file system, " +
+            "current file system is $name." + "\n. Please copy all configs under " +
+            "$HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, " +
+            "so that we can use the right File system.", ex)
+        }
+        throw ex
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
new file mode 100644
index 0000000..59f3832
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import java.nio.ByteBuffer
+
+import akka.actor.ActorRef
+import com.typesafe.config.Config
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted
+import org.apache.gearpump.experiments.yarn.glue.Records._
+import org.apache.gearpump.util.LogUtil
+import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource}
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
+/**
+ * Adapter for node manager client
+ */
+class NMClient(yarnConf: YarnConfig, config: Config) extends NMClientAsync.CallbackHandler {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  private var reportTo: ActorRef = null
+  private var client: NMClientAsyncImpl = null
+
+  def start(reportTo: ActorRef): Unit = {
+    LOG.info("Starting Node Manager Client NMClient...")
+    this.reportTo = reportTo
+    client = new NMClientAsyncImpl(this)
+    client.init(yarnConf.conf)
+    client.start()
+  }
+
+  private[glue]
+  override def onContainerStarted(
+      containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) {
+    LOG.info(s"Container started : $containerId, " + allServiceResponse)
+    reportTo ! ContainerStarted(containerId)
+  }
+
+  private[glue]
+  override def onContainerStatusReceived(
+      containerId: YarnContainerId, containerStatus: YarnContainerStatus) {
+    LOG.info(s"Container status received : $containerId, status $containerStatus")
+  }
+
+  private[glue]
+  override def onContainerStopped(containerId: YarnContainerId) {
+    LOG.error(s"Container stopped : $containerId")
+  }
+
+  private[glue]
+  override def onGetContainerStatusError(containerId: YarnContainerId, throwable: Throwable) {
+    LOG.error(s"Container exception : $containerId", throwable)
+  }
+
+  private[glue]
+  override def onStartContainerError(containerId: YarnContainerId, throwable: Throwable) {
+    LOG.error(s"Container exception : $containerId", throwable)
+  }
+
+  private[glue]
+  override def onStopContainerError(containerId: YarnContainerId, throwable: Throwable) {
+    LOG.error(s"Container exception : $containerId", throwable)
+  }
+
+  def launchCommand(
+      container: Container, command: String, packagePath: String, configPath: String): Unit = {
+    LOG.info(s"Launching command : $command on container" +
+      s":  ${container.getId}, host ip : ${container.getNodeId.getHost}")
+    val context = ContainerLaunchContext(yarnConf.conf, command, packagePath, configPath)
+    client.startContainerAsync(container, context)
+  }
+
+  def stopContainer(containerId: ContainerId, nodeId: NodeId): Unit = {
+    LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} ")
+    client.stopContainerAsync(containerId, nodeId)
+  }
+
+  def stop(): Unit = {
+    LOG.info(s"Shutdown NMClient")
+    client.stop()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
new file mode 100644
index 0000000..629e233
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import akka.actor.ActorRef
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication}
+import org.apache.gearpump.experiments.yarn.glue.Records._
+import org.apache.gearpump.util.LogUtil
+import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => YarnResource}
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+
+/**
+ * Adapter for resource manager client
+ */
+class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  private var reportTo: ActorRef = null
+  private var client: AMRMClientAsync[ContainerRequest] = null
+
+  def start(reportTo: ActorRef): Unit = {
+    LOG.info("Starting Resource Manager Client RMClient...")
+    this.reportTo = reportTo
+    client = startAMRMClient
+  }
+
+  private def startAMRMClient: AMRMClientAsync[ContainerRequest] = {
+    val timeIntervalMs = 1000 // ms
+    val amrmClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](timeIntervalMs, this)
+    amrmClient.init(yarnConf.conf)
+    amrmClient.start()
+    amrmClient
+  }
+
+  override def getProgress: Float = 0.5F
+
+  private var allocatedContainers = Set.empty[YarnContainerId]
+
+  private[glue]
+  override def onContainersAllocated(containers: java.util.List[YarnContainer]) {
+    val newContainers = containers.asScala.toList.filterNot(container =>
+      allocatedContainers.contains(container.getId))
+    allocatedContainers ++= newContainers.map(_.getId)
+    LOG.info(s"New allocated ${newContainers.size} containers")
+    reportTo ! ContainersAllocated(newContainers.map(yarnContainerToContainer(_)))
+  }
+
+  private[glue]
+  override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus])
+    : Unit = {
+    LOG.info(s"Got response from RM. Completed containers=${completedContainers.size()}")
+    reportTo ! ContainersCompleted(
+      completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_)))
+  }
+
+  private[glue]
+  override def onError(ex: Throwable) {
+    LOG.info("Error occurred")
+    reportTo ! ResourceManagerException(ex)
+  }
+
+  private[glue]
+  override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = {
+    LOG.info("onNodesUpdates")
+  }
+
+  private[glue]
+  override def onShutdownRequest() {
+    LOG.info("Shutdown requested")
+    reportTo ! ShutdownApplication
+  }
+
+  def requestContainers(containers: List[Resource]): Unit = {
+    LOG.info(s"request Resource, slots: ${containers.length}, ${containers.mkString("\n")}")
+    containers.foreach(resource => {
+      client.addContainerRequest(createContainerRequest(resource))
+    })
+  }
+
+  private def createContainerRequest(capability: Resource): ContainerRequest = {
+    LOG.info("creating ContainerRequest")
+    val priorityRecord = org.apache.hadoop.yarn.util.Records.newRecord(classOf[Priority])
+    priorityRecord.setPriority(0)
+    val priority = Priority.newInstance(0)
+    new ContainerRequest(capability, null, null, priority)
+  }
+
+  def registerAppMaster(host: String, port: Int, url: String): Unit = {
+    LOG.info(s"Received RegisterAMMessage! $host:$port $url")
+    val response = client.registerApplicationMaster(host, port, url)
+    LOG.info("Received RegisterAppMasterResponse ")
+    reportTo ! AppMasterRegistered
+  }
+
+  def shutdownApplication(): Unit = {
+    LOG.info(s"Shutdown application")
+    client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "success", null)
+    client.stop()
+  }
+
+  def failApplication(ex: Throwable): Unit = {
+    LOG.error(s"Application failed! ", ex)
+    client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, ex.getMessage, null)
+    client.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala
new file mode 100644
index 0000000..ca729ce
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, ApplicationSubmissionContext, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource, YarnApplicationState}
+import org.apache.hadoop.yarn.util.{Records => YarnRecords}
+
+import scala.language.implicitConversions
+
+object Records {
+  def newRecord[T](clazz: Class[T]): T = YarnRecords.newRecord(clazz)
+
+  def newAppSubmissionContext: ApplicationSubmissionContext = {
+    YarnRecords.newRecord(classOf[ApplicationSubmissionContext])
+  }
+
+  class ApplicationId(private[glue] val impl: YarnApplicationId) {
+    def getId: Int = impl.getId
+
+    override def toString: String = impl.toString
+
+    override def equals(other: Any): Boolean = {
+      if (other.isInstanceOf[ApplicationId]) {
+        impl.equals(other.asInstanceOf[ApplicationId].impl)
+      } else {
+        false
+      }
+    }
+
+    override def hashCode(): Int = {
+      impl.hashCode()
+    }
+  }
+
+  object ApplicationId {
+    def newInstance(timestamp: Long, id: Int): ApplicationId = {
+      YarnApplicationId.newInstance(timestamp, id)
+    }
+  }
+
+  class ApplicationReport(private[glue] val impl: YarnApplicationReport) {
+    def getApplicationId: ApplicationId = impl.getApplicationId
+
+    def getDiagnostics: String = impl.getDiagnostics
+
+    def getFinishTime: Long = impl.getFinishTime
+
+    def getOriginalTrackingUrl: String = impl.getOriginalTrackingUrl
+
+    def getYarnApplicationState: YarnApplicationState = impl.getYarnApplicationState
+
+    override def toString: String = impl.toString
+  }
+  class Resource(private[glue] val impl: YarnResource) {
+    override def toString: String = impl.toString
+
+    override def equals(other: Any): Boolean = {
+      if (other.isInstanceOf[Resource]) {
+        impl.equals(other.asInstanceOf[Resource].impl)
+      } else {
+        false
+      }
+    }
+
+    override def hashCode(): Int = {
+      impl.hashCode()
+    }
+  }
+
+  object Resource {
+    def newInstance(memory: Int, vCores: Int): Resource = {
+      YarnResource.newInstance(memory, vCores)
+    }
+  }
+
+  class Container(private[glue] val impl: YarnContainer) {
+    def getId: ContainerId = impl.getId
+
+    def getNodeHttpAddress: String = impl.getNodeHttpAddress
+
+    def getNodeId: NodeId = impl.getNodeId
+
+    override def toString: String = impl.toString
+
+    override def equals(other: Any): Boolean = {
+      if (other.isInstanceOf[Container]) {
+        impl.equals(other.asInstanceOf[Container].impl)
+      } else {
+        false
+      }
+    }
+
+    override def hashCode(): Int = {
+      impl.hashCode()
+    }
+  }
+
+  class ContainerId(private[glue] val impl: YarnContainerId) {
+    override def toString: String = impl.toString
+
+    override def equals(other: Any): Boolean = {
+      if (other.isInstanceOf[ContainerId]) {
+        impl.equals(other.asInstanceOf[ContainerId].impl)
+      } else {
+        false
+      }
+    }
+
+    override def hashCode(): Int = {
+      impl.hashCode()
+    }
+  }
+
+  object ContainerId {
+    def fromString(worker: String): ContainerId = YarnContainerId.fromString(worker)
+  }
+
+  class NodeId(private[glue] val impl: YarnNodeId) {
+    def getHost: String = impl.getHost
+
+    override def toString: String = impl.toString
+
+    override def equals(other: Any): Boolean = {
+      if (other.isInstanceOf[NodeId]) {
+        impl.equals(other.asInstanceOf[NodeId].impl)
+      } else {
+        false
+      }
+    }
+
+    override def hashCode(): Int = {
+      impl.hashCode()
+    }
+  }
+
+  class ContainerStatus(private[glue] val impl: YarnContainerStatus) {
+    def getDiagnostics: String = impl.getDiagnostics
+
+    def getContainerId: ContainerId = impl.getContainerId
+
+    def getExitStatus: Int = impl.getExitStatus
+
+    override def toString: String = impl.toString
+  }
+
+  private[glue] implicit def yarnResourceToResource(res: YarnResource): Resource = {
+    new Resource(res)
+  }
+
+  private[glue] implicit def resourceToYarnResource(res: Resource): YarnResource = {
+    res.impl
+  }
+
+  private[glue] implicit def yarnAppIdToAppId(yarn: YarnApplicationId): ApplicationId = {
+    new ApplicationId(yarn)
+  }
+
+  private[glue] implicit def appIdToYarnAppId(app: ApplicationId): YarnApplicationId = {
+    app.impl
+  }
+
+  private[glue] implicit def yarnReportToReport(yarn: YarnApplicationReport): ApplicationReport = {
+    new ApplicationReport(yarn)
+  }
+
+  private[glue] implicit def reportToYarnReport(app: ApplicationReport): YarnApplicationReport = {
+    app.impl
+  }
+
+  private[glue] implicit def yarnContainerToContainer(yarn: YarnContainer): Container = {
+    new Container(yarn)
+  }
+
+  private[glue] implicit def containerToYarnContainer(app: Container): YarnContainer = {
+    app.impl
+  }
+
+  private[glue] implicit def yarnContainerStatusToContainerStatus(yarn: YarnContainerStatus)
+    : ContainerStatus = {
+    new ContainerStatus(yarn)
+  }
+
+  private[glue] implicit def containerStatusToYarnContainerStatus(app: ContainerStatus)
+    : YarnContainerStatus = {
+    app.impl
+  }
+
+  private[glue] implicit def containerIdToYarnContainerId(app: ContainerId): YarnContainerId = {
+    app.impl
+  }
+
+  private[glue] implicit def yarnContainerIdToContainerId(yarn: YarnContainerId): ContainerId = {
+    new ContainerId(yarn)
+  }
+
+  private[glue] implicit def nodeIdToYarnNodeId(app: NodeId): YarnNodeId = {
+    app.impl
+  }
+
+  private[glue] implicit def yarnNodeIdToNodeId(yarn: YarnNodeId): NodeId = {
+    new NodeId(yarn)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala
new file mode 100644
index 0000000..634dd0e
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import org.apache.gearpump.experiments.yarn.glue.Records._
+import org.apache.gearpump.util.LogUtil
+import org.apache.hadoop.yarn.client.api
+
+/**
+ * Adapter for api.YarnClient
+ */
+class YarnClient(yarn: YarnConfig) {
+
+  val LOG = LogUtil.getLogger(getClass)
+
+  private val client: api.YarnClient = api.YarnClient.createYarnClient
+  client.init(yarn.conf)
+  client.start()
+  LOG.info("Starting YarnClient...")
+
+  def createApplication: ApplicationId = {
+    val app = client.createApplication()
+    val response = app.getNewApplicationResponse()
+    LOG.info("Create application, appId: " + response.getApplicationId())
+    response.getApplicationId()
+  }
+
+  def getApplicationReport(appId: ApplicationId): ApplicationReport = {
+    client.getApplicationReport(appId)
+  }
+
+  def submit(
+      name: String, appId: ApplicationId, command: String, resource: Resource, queue: String,
+      packagePath: String, configPath: String): ApplicationId = {
+
+    val appContext = Records.newAppSubmissionContext
+    appContext.setApplicationName(name)
+    appContext.setApplicationId(appId)
+
+    val containerContext = ContainerLaunchContext(yarn.conf, command, packagePath, configPath)
+    appContext.setAMContainerSpec(containerContext)
+    appContext.setResource(resource)
+    appContext.setQueue(queue)
+
+    LOG.info(s"Submit Application $appId to YARN...")
+    client.submitApplication(appContext)
+  }
+
+  def awaitApplication(appId: ApplicationId, timeoutMilliseconds: Long = Long.MaxValue)
+    : ApplicationReport = {
+    import org.apache.hadoop.yarn.api.records.YarnApplicationState._
+    val terminated = Set(FINISHED, KILLED, FAILED, RUNNING)
+    var result: ApplicationReport = null
+    var done = false
+
+    val start = System.currentTimeMillis()
+    def timeout: Boolean = {
+      val now = System.currentTimeMillis()
+      if (now - start > timeoutMilliseconds) {
+        true
+      } else {
+        false
+      }
+    }
+
+    while (!done && !timeout) {
+      val report = client.getApplicationReport(appId)
+      val status = report.getYarnApplicationState
+      if (terminated.contains(status)) {
+        done = true
+        result = report
+      } else {
+        Console.print(".")
+        Thread.sleep(1000)
+      }
+    }
+
+    if (timeout) {
+      throw new Exception(s"Launch Application $appId timeout...")
+    }
+    result
+  }
+
+  def stop(): Unit = {
+    client.stop()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala
new file mode 100644
index 0000000..4b927de
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.glue
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+/**
+ * wrapper for yarn configuration
+ */
+case class YarnConfig(conf: YarnConfiguration = new YarnConfiguration(new Configuration(true))) {
+  def writeXml(out: java.io.Writer): Unit = conf.writeXml(out)
+
+  def resourceManager: String = {
+    conf.get("yarn.resourcemanager.hostname")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala
new file mode 100644
index 0000000..33e5778
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn
+
+/**
+ * YARN facade to decouple Gearpump with YARN.
+ */
+package object glue {
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
deleted file mode 100644
index 2a6bf38..0000000
--- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.transport.HostPort
-
-class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  val config = ConfigFactory.parseString(
-
-    """
-      |
-      |gearpump {
-      |  yarn {
-      |    client {
-      |      package -path = "/user/gearpump/gearpump.zip"
-      |    }
-      |
-      |    applicationmaster {
-      |      ## Memory of YarnAppMaster
-      |        command = "$JAVA_HOME/bin/java -Xmx512m"
-      |      memory = "512"
-      |      vcores = "1"
-      |      queue = "default"
-      |    }
-      |
-      |    master {
-      |      ## Memory of master daemon
-      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
-      |      memory = "512"
-      |      vcores = "1"
-      |    }
-      |
-      |    worker {
-      |      ## memory of worker daemon
-      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
-      |      containers = "4"
-      |      ## This also contains all memory for child executors.
-      |      memory = "4096"
-      |      vcores = "1"
-      |    }
-      |    services {
-      |      enabled = true
-      |    }
-      |  }
-      |}
-    """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
-
-  "MasterCommand" should "create correct command line" in {
-    val version = "gearpump-0.1"
-    val master = MasterCommand(config, version, HostPort("127.0.0.1", 8080))
-
-    // scalastyle:off line.size.limit
-    val expected = "$JAVA_HOME/bin/java  -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 -Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> io.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
-    // scalastyle:on line.size.limit
-    assert(master.get == expected)
-  }
-
-  "WorkerCommand" should "create correct command line" in {
-    val version = "gearpump-0.1"
-    val worker = WorkerCommand(config, version, HostPort("127.0.0.1", 8080), "worker-machine")
-    // scalastyle:off line.size.limit
-    val expected = "$JAVA_HOME/bin/java  -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine io.gearpump.cluster.main.Worker  2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
-    // scalastyle:on line.size.limit
-    assert(worker.get == expected)
-  }
-
-  "AppMasterCommand" should "create correct command line" in {
-    val version = "gearpump-0.1"
-    val appmaster = AppMasterCommand(config, version, Array("arg1", "arg2", "arg3"))
-    // scalastyle:off line.size.limit
-    val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.binary-version-with-scala-version=gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} io.gearpump.experiments.yarn.appmaster.YarnAppMaster  arg1 arg2 arg3 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
-    // scalastyle:on line.size.limit
-    assert(appmaster.get == expected)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
deleted file mode 100644
index f8f9fe8..0000000
--- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.experiments.yarn.appmaster.UIServiceSpec.{Info, MockUI}
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Constants
-
-class UIServiceSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  implicit var system: ActorSystem = null
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem(getClass.getSimpleName, TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "start UI server correctly" in {
-    val probe = TestProbe()
-    val masters = List(
-      HostPort("127.0.0.1", 3000),
-      HostPort("127.0.0.1", 3001),
-      HostPort("127.0.0.1", 3002)
-    )
-    val host = "local"
-    val port = 8091
-
-    val ui = system.actorOf(Props(new MockUI(masters, host, port, probe.ref)))
-
-    probe.expectMsgPF() {
-      case info: Info => {
-        assert(info.masterHost == "127.0.0.1")
-        assert(info.masterPort == 3000)
-        val conf = ConfigFactory.parseFile(new java.io.File(info.configFile))
-        assert(conf.getString(Constants.GEARPUMP_SERVICE_HOST) == host)
-        assert(conf.getString(Constants.GEARPUMP_SERVICE_HTTP) == "8091")
-        assert(conf.getString(Constants.NETTY_TCP_HOSTNAME) == host)
-      }
-    }
-
-    system.stop(ui)
-  }
-}
-
-object UIServiceSpec {
-
-  case class Info(supervisor: String, masterHost: String, masterPort: Int, configFile: String)
-
-  class MockUI(masters: List[HostPort], host: String, port: Int, probe: ActorRef)
-    extends UIService(masters, host, port) {
-
-    override def launch(
-        supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = {
-      probe ! Info(supervisor, masterHost, masterPort, configFile)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
deleted file mode 100644
index 84d6d37..0000000
--- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.testkit.{TestActorRef, TestProbe}
-import com.typesafe.config.ConfigFactory
-import org.mockito.ArgumentCaptor
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.experiments.yarn.Constants
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, AppMasterRegistered, ClusterInfo, ContainerStarted, ContainersAllocated, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, ResourceManagerException, Version}
-import io.gearpump.experiments.yarn.appmaster.YarnAppMasterSpec.UI
-import io.gearpump.experiments.yarn.glue.Records.{Container, Resource, _}
-import io.gearpump.experiments.yarn.glue.{NMClient, RMClient}
-import io.gearpump.transport.HostPort
-
-class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  val config = ConfigFactory.parseString(
-
-    """
-      |gearpump {
-      |  yarn {
-      |    client {
-      |      package -path = "/user/gearpump/gearpump.zip"
-      |    }
-      |
-      |    applicationmaster {
-      |      ## Memory of YarnAppMaster
-      |        command = "$JAVA_HOME/bin/java -Xmx512m"
-      |      memory = "512"
-      |      vcores = "1"
-      |      queue = "default"
-      |    }
-      |
-      |    master {
-      |      ## Memory of master daemon
-      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
-      |      memory = "512"
-      |      vcores = "1"
-      |    }
-      |
-      |    worker {
-      |      ## memory of worker daemon
-      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
-      |      containers = "4"
-      |      ## This also contains all memory for child executors.
-      |      memory = "4096"
-      |      vcores = "1"
-      |    }
-      |    services {
-      |      enabled = true
-      |    }
-      |  }
-      |}
-    """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
-
-  val masterCount = 1
-  val workerCount = config.getString(Constants.WORKER_CONTAINERS).toInt
-
-  implicit var system: ActorSystem = null
-  val packagePath = "/user/gearpump/gearpump.zip"
-  val configPath = "/user/my/conf"
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", config)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  private def startAppMaster(): (TestActorRef[YarnAppMaster], TestProbe, NMClient, RMClient) = {
-    val rmClient = mock(classOf[RMClient])
-    val nmClient = mock(classOf[NMClient])
-    val ui = mock(classOf[UIFactory])
-    when(ui.props(any[List[HostPort]], anyString, anyInt)).thenReturn(Props(new UI))
-
-    val appMaster = TestActorRef[YarnAppMaster](Props(
-      new YarnAppMaster(rmClient, nmClient, packagePath, configPath, ui)))
-
-    verify(rmClient).start(appMaster)
-    verify(nmClient).start(appMaster)
-    verify(rmClient).registerAppMaster(anyString, anyInt, anyString)
-
-    appMaster ! AppMasterRegistered
-
-    val masterResources = ArgumentCaptor.forClass(classOf[List[Resource]])
-    verify(rmClient).requestContainers(masterResources.capture())
-    assert(masterResources.getValue.size == masterCount)
-
-    val masterContainer = mock(classOf[Container])
-    val mockNode = mock(classOf[NodeId])
-    val mockId = mock(classOf[ContainerId])
-    when(masterContainer.getNodeId).thenReturn(mockNode)
-    when(masterContainer.getId).thenReturn(mockId)
-
-    // Launchs master
-    appMaster ! ContainersAllocated(List.fill(masterCount)(masterContainer))
-    verify(nmClient,
-      times(masterCount)).launchCommand(any[Container], anyString, anyString, anyString)
-
-    // Master containers started
-    (0 until masterCount).foreach(_ => appMaster ! ContainerStarted(mockId))
-
-    // Transition to start workers
-    val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
-    verify(rmClient, times(2)).requestContainers(workerResources.capture())
-    assert(workerResources.getValue.size == workerCount)
-
-    // Launchs workers
-    val workerContainer = mock(classOf[Container])
-    when(workerContainer.getNodeId).thenReturn(mockNode)
-    val workerContainerId = ContainerId.fromString("container_1449802454214_0034_01_000006")
-    when(workerContainer.getId).thenReturn(workerContainerId)
-    appMaster ! ContainersAllocated(List.fill(workerCount)(workerContainer))
-    verify(nmClient, times(workerCount + masterCount))
-      .launchCommand(any[Container], anyString, anyString, anyString)
-
-    // Worker containers started
-    (0 until workerCount).foreach(_ => appMaster ! ContainerStarted(mockId))
-
-    // Starts UI server
-    verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt)
-
-    // Application Ready...
-    val client = TestProbe()
-
-    // Gets active config
-    appMaster.tell(GetActiveConfig("client"), client.ref)
-    client.expectMsgType[ActiveConfig]
-
-    // Queries version
-    appMaster.tell(QueryVersion, client.ref)
-    client.expectMsgType[Version]
-
-    // Queries version
-    appMaster.tell(QueryClusterInfo, client.ref)
-    client.expectMsgType[ClusterInfo]
-
-    // Adds worker
-    val newWorkerCount = 2
-    appMaster.tell(AddWorker(newWorkerCount), client.ref)
-    client.expectMsgType[CommandResult]
-    val newWorkerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
-    verify(rmClient, times(3)).requestContainers(newWorkerResources.capture())
-    assert(newWorkerResources.getValue.size == newWorkerCount)
-
-    // New container allocated
-    appMaster ! ContainersAllocated(List.fill(newWorkerCount)(workerContainer))
-    verify(nmClient, times(workerCount + masterCount + newWorkerCount)).
-      launchCommand(any[Container], anyString, anyString, anyString)
-
-    // New worker containers started
-    (0 until newWorkerCount).foreach(_ => appMaster ! ContainerStarted(mockId))
-
-    // Same UI server
-    verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt)
-
-    // Removes worker
-    appMaster.tell(RemoveWorker(workerContainerId.toString), client.ref)
-    client.expectMsgType[CommandResult]
-    verify(nmClient).stopContainer(any[ContainerId], any[NodeId])
-
-    (appMaster, client: TestProbe, nmClient, rmClient)
-  }
-
-  it should "start master, worker and UI on YARN" in {
-    val env = startAppMaster()
-    val (appMaster, client, nmClient, rmClient) = env
-
-    // Kills the app
-    appMaster.tell(Kill, client.ref)
-    client.expectMsgType[CommandResult]
-    verify(nmClient, times(1)).stop()
-    verify(rmClient, times(1)).shutdownApplication()
-  }
-
-  it should "handle resource manager errors" in {
-    val env = startAppMaster()
-    val (appMaster, client, nmClient, rmClient) = env
-
-    // on error
-    val ex = new Exception("expected resource manager exception")
-    appMaster.tell(ResourceManagerException(ex), client.ref)
-    verify(nmClient, times(1)).stop()
-    verify(rmClient, times(1)).failApplication(ex)
-  }
-}
-
-object YarnAppMasterSpec {
-
-  class UI extends Actor {
-    def receive: Receive = null
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
deleted file mode 100644
index 3bd7f4f..0000000
--- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.client
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
-import java.util.Random
-import java.util.zip.{ZipEntry, ZipOutputStream}
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.util.Try
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
-import io.gearpump.util.FileUtils
-class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  implicit var system: ActorSystem = null
-
-  val rand = new Random()
-
-  private def randomArray(size: Int): Array[Byte] = {
-    val array = new Array[Byte](size)
-    rand.nextBytes(array)
-    array
-  }
-  val appId = ApplicationId.newInstance(0L, 0)
-
-  val akka = ConfigFactory.parseString(
-
-    """
-      |gearpump {
-      |  yarn {
-      |    client {
-      |      package -path = "/user/gearpump/gearpump.zip"
-      |    }
-      |
-      |    applicationmaster {
-      |      ## Memory of YarnAppMaster
-      |        command = "$JAVA_HOME/bin/java -Xmx512m"
-      |      memory = "512"
-      |      vcores = "1"
-      |      queue = "default"
-      |    }
-      |
-      |    master {
-      |      ## Memory of master daemon
-      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
-      |      containers = "2"
-      |      memory = "512"
-      |      vcores = "1"
-      |    }
-      |
-      |    worker {
-      |      ## memory of worker daemon
-      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
-      |      containers = "4"
-      |      ## This also contains all memory for child executors.
-      |      memory = "4096"
-      |      vcores = "1"
-      |    }
-      |    services {
-      |      enabled = true
-      |    }
-      |  }
-      |}
-    """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem(getClass.getSimpleName, akka)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "reject non-zip files" in {
-    val yarnConfig = mock(classOf[YarnConfig])
-    val yarnClient = mock(classOf[YarnClient])
-    val fs = mock(classOf[FileSystem])
-    val appMasterResolver = mock(classOf[AppMasterResolver])
-
-    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
-    val packagePath = "gearpump.zip2"
-    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
-  }
-
-  it should "reject if we cannot find the package file on HDFS" in {
-    val yarnConfig = mock(classOf[YarnConfig])
-    val yarnClient = mock(classOf[YarnClient])
-    val fs = mock(classOf[FileSystem])
-    val appMasterResolver = mock(classOf[AppMasterResolver])
-
-    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
-    val packagePath = "gearpump.zip"
-    when(fs.exists(anyString)).thenReturn(false)
-    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
-  }
-
-  it should "throw when package exists on HDFS, but the file is corrupted" in {
-    val yarnConfig = mock(classOf[YarnConfig])
-    val yarnClient = mock(classOf[YarnClient])
-    val fs = mock(classOf[FileSystem])
-    val appMasterResolver = mock(classOf[AppMasterResolver])
-
-    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver)
-    val packagePath = "gearpump.zip"
-    when(fs.exists(anyString)).thenReturn(true)
-
-    val content = new ByteArrayInputStream(randomArray(10))
-    when(fs.open(anyString)).thenReturn(content)
-    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
-    content.close()
-  }
-
-  it should "throw when the HDFS package version is not consistent with local version" in {
-    val yarnConfig = mock(classOf[YarnConfig])
-    val yarnClient = mock(classOf[YarnClient])
-    val fs = mock(classOf[FileSystem])
-    val appMasterResolver = mock(classOf[AppMasterResolver])
-
-    val version = "gearpump-0.2"
-    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
-      appMasterResolver, version)
-    val packagePath = "gearpump.zip"
-    when(fs.exists(anyString)).thenReturn(true)
-
-    val oldVesion = "gearpump-0.1"
-    when(fs.open(anyString)).thenReturn(zipInputStream(oldVesion))
-    assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
-  }
-
-  it should "upload config file to HDFS when submitting" in {
-    val yarnConfig = mock(classOf[YarnConfig])
-    val yarnClient = mock(classOf[YarnClient])
-    val fs = mock(classOf[FileSystem])
-    val appMasterResolver = mock(classOf[AppMasterResolver])
-
-    val version = "gearpump-0.2"
-    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient,
-      fs, system, appMasterResolver, version)
-    val packagePath = "gearpump.zip"
-
-    val out = mock(classOf[OutputStream])
-    when(fs.exists(anyString)).thenReturn(true)
-    when(fs.create(anyString)).thenReturn(out)
-    when(fs.getHomeDirectory).thenReturn("/root")
-
-    when(fs.open(anyString)).thenReturn(zipInputStream(version))
-
-    val report = mock(classOf[ApplicationReport])
-    when(yarnClient.awaitApplication(any[ApplicationId], anyLong())).thenReturn(report)
-
-    when(report.getApplicationId).thenReturn(appId)
-    when(yarnClient.createApplication).thenReturn(appId)
-    assert(appId == launcher.submit("gearpump", packagePath))
-
-    // 3 Config files are uploaded to HDFS, one is akka.conf,
-    // one is yarn-site.xml, one is log4j.properties.
-    verify(fs, times(3)).create(anyString)
-    verify(out, times(3)).close()
-
-    // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
-    // scalastyle:off line.size.limit
-    val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} io.gearpump.experiments.yarn.appmaster.YarnAppMaster  -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
-    // scalastyle:on line.size.limit
-    verify(yarnClient).submit("gearpump", appId, expectedCommand,
-      Resource.newInstance(512, 1), "default",
-      "gearpump.zip", "/root/.gearpump_application_0_0000/conf/")
-  }
-
-  it should "save active config from Gearpump cluster" in {
-    val yarnConfig = mock(classOf[YarnConfig])
-    val yarnClient = mock(classOf[YarnClient])
-    val fs = mock(classOf[FileSystem])
-    val appMasterResolver = mock(classOf[AppMasterResolver])
-    val appMaster = TestProbe()
-
-    val version = "gearpump-0.2"
-    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
-      appMasterResolver, version)
-    val outputPath = java.io.File.createTempFile("LaunchClusterSpec", ".conf")
-
-    when(appMasterResolver.resolve(any[ApplicationId], anyInt)).thenReturn(appMaster.ref)
-    val fileFuture = launcher.saveConfig(appId, outputPath.getPath)
-    appMaster.expectMsgType[GetActiveConfig]
-    appMaster.reply(ActiveConfig(ConfigFactory.empty()))
-
-    import scala.concurrent.duration._
-    val file = Await.result(fileFuture, 30.seconds).asInstanceOf[java.io.File]
-
-    assert(!FileUtils.read(file).isEmpty)
-    file.delete()
-  }
-
-  private def zipInputStream(version: String): InputStream = {
-    val bytes = new ByteArrayOutputStream(1000)
-    val zipOut = new ZipOutputStream(bytes)
-
-    // Not available on BufferedOutputStream
-    zipOut.putNextEntry(new ZipEntry(s"$version/README.md"))
-    zipOut.write("README".getBytes())
-    // Not available on BufferedOutputStream
-    zipOut.closeEntry()
-    zipOut.close()
-    new ByteArrayInputStream(bytes.toByteArray)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
deleted file mode 100644
index b324ece..0000000
--- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.client
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.main.ParseResult
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version}
-import io.gearpump.experiments.yarn.client.ManageCluster._
-import io.gearpump.experiments.yarn.glue.Records.ApplicationId
-import io.gearpump.util.FileUtils
-
-class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  implicit var system: ActorSystem = null
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "getConfig from remote Gearpump" in {
-    val appId = ApplicationId.newInstance(0L, 0)
-    val appMaster = TestProbe()
-    val manager = new ManageCluster(appId, appMaster.ref, system)
-
-    val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
-
-    val future = manager.command(GET_CONFIG, new ParseResult(Map("output" -> output.toString),
-      Array.empty[String]))
-    appMaster.expectMsgType[GetActiveConfig]
-    appMaster.reply(ActiveConfig(ConfigFactory.empty()))
-    import scala.concurrent.duration._
-    Await.result(future, 30.seconds)
-
-    val content = FileUtils.read(output)
-    assert(content.length > 0)
-    output.delete()
-  }
-
-  it should "addworker" in {
-    val appId = ApplicationId.newInstance(0L, 0)
-    val appMaster = TestProbe()
-    val manager = new ManageCluster(appId, appMaster.ref, system)
-
-    val future = manager.command(ADD_WORKER, new ParseResult(Map("count" -> 1.toString),
-      Array.empty[String]))
-    appMaster.expectMsg(AddWorker(1))
-    appMaster.reply(CommandResult(true))
-    import scala.concurrent.duration._
-    val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
-    assert(result.success)
-  }
-
-  it should "removeworker" in {
-    val appId = ApplicationId.newInstance(0L, 0)
-    val appMaster = TestProbe()
-    val manager = new ManageCluster(appId, appMaster.ref, system)
-
-    val future = manager.command(REMOVE_WORKER, new ParseResult(Map("container" -> "1"),
-      Array.empty[String]))
-    appMaster.expectMsg(RemoveWorker("1"))
-    appMaster.reply(CommandResult(true))
-    import scala.concurrent.duration._
-    val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
-    assert(result.success)
-  }
-
-  it should "get version" in {
-    val appId = ApplicationId.newInstance(0L, 0)
-    val appMaster = TestProbe()
-    val manager = new ManageCluster(appId, appMaster.ref, system)
-    val future = manager.command(VERSION, new ParseResult(Map("container" -> "1"),
-      Array.empty[String]))
-    appMaster.expectMsg(QueryVersion)
-    appMaster.reply(Version("version 0.1"))
-    import scala.concurrent.duration._
-    val result = Await.result(future, 30.seconds).asInstanceOf[Version]
-    assert(result.version == "version 0.1")
-  }
-
-  it should "get cluster info" in {
-    val appId = ApplicationId.newInstance(0L, 0)
-    val appMaster = TestProbe()
-    val manager = new ManageCluster(appId, appMaster.ref, system)
-
-    val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
-
-    val future = manager.command(QUERY, new ParseResult(Map.empty[String, String],
-      Array.empty[String]))
-    appMaster.expectMsg(QueryClusterInfo)
-    appMaster.reply(ClusterInfo(List("master"), List("worker")))
-    import scala.concurrent.duration._
-    val result = Await.result(future, 30.seconds).asInstanceOf[ClusterInfo]
-    assert(result.masters.sameElements(List("master")))
-    assert(result.workers.sameElements(List("worker")))
-  }
-
-  it should "kill the cluster" in {
-    val appId = ApplicationId.newInstance(0L, 0)
-    val appMaster = TestProbe()
-    val manager = new ManageCluster(appId, appMaster.ref, system)
-
-    val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
-
-    val future = manager.command(KILL, new ParseResult(Map("container" -> "1"),
-      Array.empty[String]))
-    appMaster.expectMsg(Kill)
-    appMaster.reply(CommandResult(true))
-    import scala.concurrent.duration._
-    val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
-    assert(result.success)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala
new file mode 100644
index 0000000..c4c5a65
--- /dev/null
+++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.transport.HostPort
+
+class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  val config = ConfigFactory.parseString(
+
+    """
+      |
+      |gearpump {
+      |  yarn {
+      |    client {
+      |      package -path = "/user/gearpump/gearpump.zip"
+      |    }
+      |
+      |    applicationmaster {
+      |      ## Memory of YarnAppMaster
+      |        command = "$JAVA_HOME/bin/java -Xmx512m"
+      |      memory = "512"
+      |      vcores = "1"
+      |      queue = "default"
+      |    }
+      |
+      |    master {
+      |      ## Memory of master daemon
+      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
+      |      memory = "512"
+      |      vcores = "1"
+      |    }
+      |
+      |    worker {
+      |      ## memory of worker daemon
+      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
+      |      containers = "4"
+      |      ## This also contains all memory for child executors.
+      |      memory = "4096"
+      |      vcores = "1"
+      |    }
+      |    services {
+      |      enabled = true
+      |    }
+      |  }
+      |}
+    """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
+
+  "MasterCommand" should "create correct command line" in {
+    val version = "gearpump-0.1"
+    val master = MasterCommand(config, version, HostPort("127.0.0.1", 8080))
+
+    // scalastyle:off line.size.limit
+    val expected = "$JAVA_HOME/bin/java  -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 -Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> org.apache.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
+    // scalastyle:on line.size.limit
+    assert(master.get == expected)
+  }
+
+  "WorkerCommand" should "create correct command line" in {
+    val version = "gearpump-0.1"
+    val worker = WorkerCommand(config, version, HostPort("127.0.0.1", 8080), "worker-machine")
+    // scalastyle:off line.size.limit
+    val expected = "$JAVA_HOME/bin/java  -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine org.apache.gearpump.cluster.main.Worker  2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
+    // scalastyle:on line.size.limit
+    assert(worker.get == expected)
+  }
+
+  "AppMasterCommand" should "create correct command line" in {
+    val version = "gearpump-0.1"
+    val appmaster = AppMasterCommand(config, version, Array("arg1", "arg2", "arg3"))
+    // scalastyle:off line.size.limit
+    val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.binary-version-with-scala-version=gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster  arg1 arg2 arg3 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
+    // scalastyle:on line.size.limit
+    assert(appmaster.get == expected)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
new file mode 100644
index 0000000..34af145
--- /dev/null
+++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.experiments.yarn.appmaster.UIServiceSpec.{Info, MockUI}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class UIServiceSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  implicit var system: ActorSystem = null
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem(getClass.getSimpleName, TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "start UI server correctly" in {
+    val probe = TestProbe()
+    val masters = List(
+      HostPort("127.0.0.1", 3000),
+      HostPort("127.0.0.1", 3001),
+      HostPort("127.0.0.1", 3002)
+    )
+    val host = "local"
+    val port = 8091
+
+    val ui = system.actorOf(Props(new MockUI(masters, host, port, probe.ref)))
+
+    probe.expectMsgPF() {
+      case info: Info => {
+        assert(info.masterHost == "127.0.0.1")
+        assert(info.masterPort == 3000)
+        val conf = ConfigFactory.parseFile(new java.io.File(info.configFile))
+        assert(conf.getString(Constants.GEARPUMP_SERVICE_HOST) == host)
+        assert(conf.getString(Constants.GEARPUMP_SERVICE_HTTP) == "8091")
+        assert(conf.getString(Constants.NETTY_TCP_HOSTNAME) == host)
+      }
+    }
+
+    system.stop(ui)
+  }
+}
+
+object UIServiceSpec {
+
+  case class Info(supervisor: String, masterHost: String, masterPort: Int, configFile: String)
+
+  class MockUI(masters: List[HostPort], host: String, port: Int, probe: ActorRef)
+    extends UIService(masters, host, port) {
+
+    override def launch(
+        supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = {
+      probe ! Info(supervisor, masterHost, masterPort, configFile)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
new file mode 100644
index 0000000..055e822
--- /dev/null
+++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.testkit.{TestActorRef, TestProbe}
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.experiments.yarn.Constants
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, AppMasterRegistered, ClusterInfo, ContainerStarted, ContainersAllocated, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, ResourceManagerException, Version}
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMasterSpec.UI
+import org.apache.gearpump.experiments.yarn.glue.Records.{Container, Resource, _}
+import org.apache.gearpump.experiments.yarn.glue.{NMClient, RMClient}
+import org.apache.gearpump.transport.HostPort
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+  val config = ConfigFactory.parseString(
+
+    """
+      |gearpump {
+      |  yarn {
+      |    client {
+      |      package -path = "/user/gearpump/gearpump.zip"
+      |    }
+      |
+      |    applicationmaster {
+      |      ## Memory of YarnAppMaster
+      |        command = "$JAVA_HOME/bin/java -Xmx512m"
+      |      memory = "512"
+      |      vcores = "1"
+      |      queue = "default"
+      |    }
+      |
+      |    master {
+      |      ## Memory of master daemon
+      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
+      |      memory = "512"
+      |      vcores = "1"
+      |    }
+      |
+      |    worker {
+      |      ## memory of worker daemon
+      |      command = "$JAVA_HOME/bin/java  -Xmx512m"
+      |      containers = "4"
+      |      ## This also contains all memory for child executors.
+      |      memory = "4096"
+      |      vcores = "1"
+      |    }
+      |    services {
+      |      enabled = true
+      |    }
+      |  }
+      |}
+    """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
+
+  val masterCount = 1
+  val workerCount = config.getString(Constants.WORKER_CONTAINERS).toInt
+
+  implicit var system: ActorSystem = null
+  val packagePath = "/user/gearpump/gearpump.zip"
+  val configPath = "/user/my/conf"
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", config)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  private def startAppMaster(): (TestActorRef[YarnAppMaster], TestProbe, NMClient, RMClient) = {
+    val rmClient = mock(classOf[RMClient])
+    val nmClient = mock(classOf[NMClient])
+    val ui = mock(classOf[UIFactory])
+    when(ui.props(any[List[HostPort]], anyString, anyInt)).thenReturn(Props(new UI))
+
+    val appMaster = TestActorRef[YarnAppMaster](Props(
+      new YarnAppMaster(rmClient, nmClient, packagePath, configPath, ui)))
+
+    verify(rmClient).start(appMaster)
+    verify(nmClient).start(appMaster)
+    verify(rmClient).registerAppMaster(anyString, anyInt, anyString)
+
+    appMaster ! AppMasterRegistered
+
+    val masterResources = ArgumentCaptor.forClass(classOf[List[Resource]])
+    verify(rmClient).requestContainers(masterResources.capture())
+    assert(masterResources.getValue.size == masterCount)
+
+    val masterContainer = mock(classOf[Container])
+    val mockNode = mock(classOf[NodeId])
+    val mockId = mock(classOf[ContainerId])
+    when(masterContainer.getNodeId).thenReturn(mockNode)
+    when(masterContainer.getId).thenReturn(mockId)
+
+    // Launchs master
+    appMaster ! ContainersAllocated(List.fill(masterCount)(masterContainer))
+    verify(nmClient,
+      times(masterCount)).launchCommand(any[Container], anyString, anyString, anyString)
+
+    // Master containers started
+    (0 until masterCount).foreach(_ => appMaster ! ContainerStarted(mockId))
+
+    // Transition to start workers
+    val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
+    verify(rmClient, times(2)).requestContainers(workerResources.capture())
+    assert(workerResources.getValue.size == workerCount)
+
+    // Launchs workers
+    val workerContainer = mock(classOf[Container])
+    when(workerContainer.getNodeId).thenReturn(mockNode)
+    val workerContainerId = ContainerId.fromString("container_1449802454214_0034_01_000006")
+    when(workerContainer.getId).thenReturn(workerContainerId)
+    appMaster ! ContainersAllocated(List.fill(workerCount)(workerContainer))
+    verify(nmClient, times(workerCount + masterCount))
+      .launchCommand(any[Container], anyString, anyString, anyString)
+
+    // Worker containers started
+    (0 until workerCount).foreach(_ => appMaster ! ContainerStarted(mockId))
+
+    // Starts UI server
+    verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt)
+
+    // Application Ready...
+    val client = TestProbe()
+
+    // Gets active config
+    appMaster.tell(GetActiveConfig("client"), client.ref)
+    client.expectMsgType[ActiveConfig]
+
+    // Queries version
+    appMaster.tell(QueryVersion, client.ref)
+    client.expectMsgType[Version]
+
+    // Queries version
+    appMaster.tell(QueryClusterInfo, client.ref)
+    client.expectMsgType[ClusterInfo]
+
+    // Adds worker
+    val newWorkerCount = 2
+    appMaster.tell(AddWorker(newWorkerCount), client.ref)
+    client.expectMsgType[CommandResult]
+    val newWorkerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
+    verify(rmClient, times(3)).requestContainers(newWorkerResources.capture())
+    assert(newWorkerResources.getValue.size == newWorkerCount)
+
+    // New container allocated
+    appMaster ! ContainersAllocated(List.fill(newWorkerCount)(workerContainer))
+    verify(nmClient, times(workerCount + masterCount + newWorkerCount)).
+      launchCommand(any[Container], anyString, anyString, anyString)
+
+    // New worker containers started
+    (0 until newWorkerCount).foreach(_ => appMaster ! ContainerStarted(mockId))
+
+    // Same UI server
+    verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt)
+
+    // Removes worker
+    appMaster.tell(RemoveWorker(workerContainerId.toString), client.ref)
+    client.expectMsgType[CommandResult]
+    verify(nmClient).stopContainer(any[ContainerId], any[NodeId])
+
+    (appMaster, client: TestProbe, nmClient, rmClient)
+  }
+
+  it should "start master, worker and UI on YARN" in {
+    val env = startAppMaster()
+    val (appMaster, client, nmClient, rmClient) = env
+
+    // Kills the app
+    appMaster.tell(Kill, client.ref)
+    client.expectMsgType[CommandResult]
+    verify(nmClient, times(1)).stop()
+    verify(rmClient, times(1)).shutdownApplication()
+  }
+
+  it should "handle resource manager errors" in {
+    val env = startAppMaster()
+    val (appMaster, client, nmClient, rmClient) = env
+
+    // on error
+    val ex = new Exception("expected resource manager exception")
+    appMaster.tell(ResourceManagerException(ex), client.ref)
+    verify(nmClient, times(1)).stop()
+    verify(rmClient, times(1)).failApplication(ex)
+  }
+}
+
+object YarnAppMasterSpec {
+
+  class UI extends Actor {
+    def receive: Receive = null
+  }
+}
\ No newline at end of file