You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:32 UTC

[22/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
deleted file mode 100644
index 3e50abe..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.client
-
-import java.io.{File, IOException}
-import java.net.InetAddress
-import scala.concurrent.{Await, Future}
-
-import akka.actor.{ActorRef, ActorSystem}
-
-import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version}
-import io.gearpump.experiments.yarn.glue.Records.ApplicationId
-import io.gearpump.experiments.yarn.glue.{YarnClient, YarnConfig}
-import io.gearpump.util.ActorUtil.askActor
-import io.gearpump.util.{AkkaApp, LogUtil}
-
-/** Manage current Gearpump cluster on YARN */
-class ManageCluster(appId: ApplicationId, appMaster: ActorRef, system: ActorSystem) {
-  import io.gearpump.experiments.yarn.client.ManageCluster._
-
-  private val host = InetAddress.getLocalHost.getHostName
-  implicit val dispatcher = system.dispatcher
-
-  def getConfig: Future[ActiveConfig] = askActor[ActiveConfig](appMaster, GetActiveConfig(host))
-  def version: Future[Version] = askActor[Version](appMaster, QueryVersion)
-  def addWorker(count: Int): Future[CommandResult] = {
-    askActor[CommandResult](appMaster, AddWorker(count))
-  }
-
-  def removeWorker(worker: String): Future[CommandResult] = {
-    askActor[CommandResult](appMaster, RemoveWorker(worker))
-  }
-
-  def shutdown: Future[CommandResult] = askActor[CommandResult](appMaster, Kill)
-  def queryClusterInfo: Future[ClusterInfo] = askActor[ClusterInfo](appMaster, QueryClusterInfo)
-
-  def command(command: String, parsed: ParseResult): Future[AnyRef] = {
-    command match {
-      case GET_CONFIG =>
-        if (parsed.exists(OUTPUT)) {
-          getConfig.map { conf =>
-            ClusterConfig.saveConfig(conf.config, new File(parsed.getString(OUTPUT)))
-            conf
-          }
-        } else {
-          throw new IOException(s"Please specify -$OUTPUT option")
-        }
-      case ADD_WORKER =>
-        val count = parsed.getString(COUNT).toInt
-        addWorker(count)
-      case REMOVE_WORKER =>
-        val containerId = parsed.getString(CONTAINER)
-        if (containerId == null || containerId.isEmpty) {
-          throw new IOException(s"Please specify -$CONTAINER option")
-        } else {
-          removeWorker(containerId)
-        }
-      case KILL =>
-        shutdown
-      case QUERY =>
-        queryClusterInfo
-      case VERSION =>
-        version
-    }
-  }
-}
-
-object ManageCluster extends AkkaApp with ArgumentsParser {
-  val GET_CONFIG = "getconfig"
-  val ADD_WORKER = "addworker"
-  val REMOVE_WORKER = "removeworker"
-  val KILL = "kill"
-  val VERSION = "version"
-  val QUERY = "query"
-  val COMMAND = "command"
-  val CONTAINER = "container"
-  val OUTPUT = "output"
-  val COUNT = "count"
-  val APPID = "appid"
-  val VERBOSE = "verbose"
-
-  val commands = List(GET_CONFIG, ADD_WORKER, REMOVE_WORKER, KILL, VERSION, QUERY)
-
-  import scala.concurrent.duration._
-  val TIME_OUT_SECONDS = 30.seconds
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    COMMAND -> CLIOption[String](s"<${commands.mkString("|")}>", required = true),
-    APPID -> CLIOption[String]("<Application id, format: application_timestamp_id>",
-    required = true),
-    COUNT -> CLIOption("<how many instance to add or remove>", required = false,
-      defaultValue = Some(1)),
-    VERBOSE -> CLIOption("<print verbose log on console>", required = false,
-      defaultValue = Some(false)),
-    OUTPUT -> CLIOption("<output path for configuration file>", required = false,
-      defaultValue = Some("")),
-    CONTAINER -> CLIOption("<container id for master or worker>", required = false,
-      defaultValue = Some(""))
-  )
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-
-    val yarnConfig = new YarnConfig()
-    val yarnClient = new YarnClient(yarnConfig)
-
-    val parsed = parse(args)
-
-    if (parsed.getBoolean(VERBOSE)) {
-      LogUtil.verboseLogToConsole()
-    }
-
-    val appId = parseAppId(parsed.getString(APPID))
-    val system = ActorSystem("manageCluster", akkaConf)
-
-    val appMasterResolver = new AppMasterResolver(yarnClient, system)
-    val appMaster = appMasterResolver.resolve(appId)
-
-    implicit val dispatcher = system.dispatcher
-    val manager = new ManageCluster(appId, appMaster, system)
-
-    val command = parsed.getString(COMMAND)
-    val result = manager.command(command, parsed)
-
-    // scalastyle:off println
-    Console.println(Await.result(result, TIME_OUT_SECONDS))
-    // scalastyle:on println
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  def parseAppId(str: String): ApplicationId = {
-    val parts = str.split("_")
-    val timestamp = parts(1).toLong
-    val id = parts(2).toInt
-    ApplicationId.newInstance(timestamp, id)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
deleted file mode 100644
index 6b3385f..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import java.io.File
-import java.nio.ByteBuffer
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.fs.{FileSystem => YarnFileSystem, Path}
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.mapreduce.security.TokenCache
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.slf4j.Logger
-
-import io.gearpump.util.LogUtil
-
-private[glue]
-object ContainerLaunchContext {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String)
-    : ContainerLaunchContext = {
-    val context = Records.newRecord(classOf[ContainerLaunchContext])
-    context.setCommands(Seq(command).asJava)
-    context.setEnvironment(getAppEnv(yarnConf).asJava)
-    context.setTokens(getToken(yarnConf, packagePath, configPath))
-    context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath).asJava)
-    context
-  }
-
-  private def getFs(yarnConf: YarnConfiguration) = YarnFileSystem.get(yarnConf)
-
-  private def getAppEnv(yarnConf: YarnConfiguration): Map[String, String] = {
-    val classPaths = yarnConf.getStrings(
-      YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-      YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.mkString(File.pathSeparator))
-    val allPaths = Option(classPaths).getOrElse(Array(""))
-
-    allPaths :+ Environment.PWD.$() + File.separator + "*" + File.pathSeparator
-
-    Map(Environment.CLASSPATH.name -> allPaths.map(_.trim).mkString(File.pathSeparator))
-  }
-
-  private def getAMLocalResourcesMap(
-      yarnConf: YarnConfiguration, packagePath: String, configPath: String)
-    : Map[String, LocalResource] = {
-    val fs = getFs(yarnConf)
-
-    Map(
-      "pack" -> newYarnAppResource(fs, new Path(packagePath),
-        LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION),
-      "conf" -> newYarnAppResource(fs, new Path(configPath),
-        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION))
-  }
-
-  private def newYarnAppResource(
-      fs: YarnFileSystem, path: Path,
-      resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = {
-    val qualified = fs.makeQualified(path)
-    val status = fs.getFileStatus(qualified)
-    val resource = Records.newRecord(classOf[LocalResource])
-    resource.setType(resourceType)
-    resource.setVisibility(vis)
-    resource.setResource(ConverterUtils.getYarnUrlFromPath(qualified))
-    resource.setTimestamp(status.getModificationTime)
-    resource.setSize(status.getLen)
-    resource
-  }
-
-  private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String)
-    : ByteBuffer = {
-    val credentials = UserGroupInformation.getCurrentUser.getCredentials
-    val dob = new DataOutputBuffer
-    val dirs = Array(new Path(packagePath), new Path(configPath))
-    TokenCache.obtainTokensForNamenodes(credentials, dirs, yc)
-    credentials.writeTokenStorageToStream(dob)
-    ByteBuffer.wrap(dob.getData)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
deleted file mode 100644
index acf09ac..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import java.io.{InputStream, OutputStream}
-import java.net.ConnectException
-import scala.util.{Failure, Success, Try}
-
-import org.apache.hadoop.fs.Path
-
-import io.gearpump.util.LogUtil
-
-class FileSystem(yarnConfig: YarnConfig) {
-
-  private val conf = yarnConfig.conf
-  private val fs = org.apache.hadoop.fs.FileSystem.get(conf)
-
-  private def LOG = LogUtil.getLogger(getClass)
-
-  def open(file: String): InputStream = exceptionHandler {
-    val path = new Path(file)
-    fs.open(path)
-  }
-
-  def create(file: String): OutputStream = exceptionHandler {
-    val path = new Path(file)
-    fs.create(path)
-  }
-
-  def exists(file: String): Boolean = exceptionHandler {
-    val path = new Path(file)
-    fs.exists(path)
-  }
-
-  def name: String = {
-    fs.getUri.toString
-  }
-
-  def getHomeDirectory: String = {
-    fs.getHomeDirectory.toString
-  }
-
-  private def exceptionHandler[T](call: => T): T = {
-    val callTry = Try(call)
-    callTry match {
-      case Success(v) => v
-      case Failure(ex) =>
-        if (ex.isInstanceOf[ConnectException]) {
-          LOG.error("Please check whether we connect to the right HDFS file system, " +
-            "current file system is $name." + "\n. Please copy all configs under " +
-            "$HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, " +
-            "so that we can use the right File system.", ex)
-        }
-        throw ex
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
deleted file mode 100644
index 3e7e668..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import java.nio.ByteBuffer
-
-import akka.actor.ActorRef
-import com.typesafe.config.Config
-import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource}
-import org.apache.hadoop.yarn.client.api.async.NMClientAsync
-import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
-
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
-/**
- * Adapter for node manager client
- */
-class NMClient(yarnConf: YarnConfig, config: Config) extends NMClientAsync.CallbackHandler {
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  private var reportTo: ActorRef = null
-  private var client: NMClientAsyncImpl = null
-
-  def start(reportTo: ActorRef): Unit = {
-    LOG.info("Starting Node Manager Client NMClient...")
-    this.reportTo = reportTo
-    client = new NMClientAsyncImpl(this)
-    client.init(yarnConf.conf)
-    client.start()
-  }
-
-  private[glue]
-  override def onContainerStarted(
-      containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) {
-    LOG.info(s"Container started : $containerId, " + allServiceResponse)
-    reportTo ! ContainerStarted(containerId)
-  }
-
-  private[glue]
-  override def onContainerStatusReceived(
-      containerId: YarnContainerId, containerStatus: YarnContainerStatus) {
-    LOG.info(s"Container status received : $containerId, status $containerStatus")
-  }
-
-  private[glue]
-  override def onContainerStopped(containerId: YarnContainerId) {
-    LOG.error(s"Container stopped : $containerId")
-  }
-
-  private[glue]
-  override def onGetContainerStatusError(containerId: YarnContainerId, throwable: Throwable) {
-    LOG.error(s"Container exception : $containerId", throwable)
-  }
-
-  private[glue]
-  override def onStartContainerError(containerId: YarnContainerId, throwable: Throwable) {
-    LOG.error(s"Container exception : $containerId", throwable)
-  }
-
-  private[glue]
-  override def onStopContainerError(containerId: YarnContainerId, throwable: Throwable) {
-    LOG.error(s"Container exception : $containerId", throwable)
-  }
-
-  def launchCommand(
-      container: Container, command: String, packagePath: String, configPath: String): Unit = {
-    LOG.info(s"Launching command : $command on container" +
-      s":  ${container.getId}, host ip : ${container.getNodeId.getHost}")
-    val context = ContainerLaunchContext(yarnConf.conf, command, packagePath, configPath)
-    client.startContainerAsync(container, context)
-  }
-
-  def stopContainer(containerId: ContainerId, nodeId: NodeId): Unit = {
-    LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} ")
-    client.stopContainerAsync(containerId, nodeId)
-  }
-
-  def stop(): Unit = {
-    LOG.info(s"Shutdown NMClient")
-    client.stop()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
deleted file mode 100644
index 7b9d83c..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import scala.collection.JavaConverters._
-
-import akka.actor.ActorRef
-import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => YarnResource}
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
-import org.slf4j.Logger
-
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication}
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
-
-/**
- * Adapter for resource manager client
- */
-class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  private var reportTo: ActorRef = null
-  private var client: AMRMClientAsync[ContainerRequest] = null
-
-  def start(reportTo: ActorRef): Unit = {
-    LOG.info("Starting Resource Manager Client RMClient...")
-    this.reportTo = reportTo
-    client = startAMRMClient
-  }
-
-  private def startAMRMClient: AMRMClientAsync[ContainerRequest] = {
-    val timeIntervalMs = 1000 // ms
-    val amrmClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](timeIntervalMs, this)
-    amrmClient.init(yarnConf.conf)
-    amrmClient.start()
-    amrmClient
-  }
-
-  override def getProgress: Float = 0.5F
-
-  private var allocatedContainers = Set.empty[YarnContainerId]
-
-  private[glue]
-  override def onContainersAllocated(containers: java.util.List[YarnContainer]) {
-    val newContainers = containers.asScala.toList.filterNot(container =>
-      allocatedContainers.contains(container.getId))
-    allocatedContainers ++= newContainers.map(_.getId)
-    LOG.info(s"New allocated ${newContainers.size} containers")
-    reportTo ! ContainersAllocated(newContainers.map(yarnContainerToContainer(_)))
-  }
-
-  private[glue]
-  override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus])
-    : Unit = {
-    LOG.info(s"Got response from RM. Completed containers=${completedContainers.size()}")
-    reportTo ! ContainersCompleted(
-      completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_)))
-  }
-
-  private[glue]
-  override def onError(ex: Throwable) {
-    LOG.info("Error occurred")
-    reportTo ! ResourceManagerException(ex)
-  }
-
-  private[glue]
-  override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = {
-    LOG.info("onNodesUpdates")
-  }
-
-  private[glue]
-  override def onShutdownRequest() {
-    LOG.info("Shutdown requested")
-    reportTo ! ShutdownApplication
-  }
-
-  def requestContainers(containers: List[Resource]): Unit = {
-    LOG.info(s"request Resource, slots: ${containers.length}, ${containers.mkString("\n")}")
-    containers.foreach(resource => {
-      client.addContainerRequest(createContainerRequest(resource))
-    })
-  }
-
-  private def createContainerRequest(capability: Resource): ContainerRequest = {
-    LOG.info("creating ContainerRequest")
-    val priorityRecord = org.apache.hadoop.yarn.util.Records.newRecord(classOf[Priority])
-    priorityRecord.setPriority(0)
-    val priority = Priority.newInstance(0)
-    new ContainerRequest(capability, null, null, priority)
-  }
-
-  def registerAppMaster(host: String, port: Int, url: String): Unit = {
-    LOG.info(s"Received RegisterAMMessage! $host:$port $url")
-    val response = client.registerApplicationMaster(host, port, url)
-    LOG.info("Received RegisterAppMasterResponse ")
-    reportTo ! AppMasterRegistered
-  }
-
-  def shutdownApplication(): Unit = {
-    LOG.info(s"Shutdown application")
-    client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "success", null)
-    client.stop()
-  }
-
-  def failApplication(ex: Throwable): Unit = {
-    LOG.error(s"Application failed! ", ex)
-    client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, ex.getMessage, null)
-    client.stop()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
deleted file mode 100644
index 7b863fa..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import scala.language.implicitConversions
-
-import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, ApplicationSubmissionContext, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource, YarnApplicationState}
-import org.apache.hadoop.yarn.util.{Records => YarnRecords}
-
-object Records {
-  def newRecord[T](clazz: Class[T]): T = YarnRecords.newRecord(clazz)
-
-  def newAppSubmissionContext: ApplicationSubmissionContext = {
-    YarnRecords.newRecord(classOf[ApplicationSubmissionContext])
-  }
-
-  class ApplicationId(private[glue] val impl: YarnApplicationId) {
-    def getId: Int = impl.getId
-
-    override def toString: String = impl.toString
-
-    override def equals(other: Any): Boolean = {
-      if (other.isInstanceOf[ApplicationId]) {
-        impl.equals(other.asInstanceOf[ApplicationId].impl)
-      } else {
-        false
-      }
-    }
-
-    override def hashCode(): Int = {
-      impl.hashCode()
-    }
-  }
-
-  object ApplicationId {
-    def newInstance(timestamp: Long, id: Int): ApplicationId = {
-      YarnApplicationId.newInstance(timestamp, id)
-    }
-  }
-
-  class ApplicationReport(private[glue] val impl: YarnApplicationReport) {
-    def getApplicationId: ApplicationId = impl.getApplicationId
-
-    def getDiagnostics: String = impl.getDiagnostics
-
-    def getFinishTime: Long = impl.getFinishTime
-
-    def getOriginalTrackingUrl: String = impl.getOriginalTrackingUrl
-
-    def getYarnApplicationState: YarnApplicationState = impl.getYarnApplicationState
-
-    override def toString: String = impl.toString
-  }
-  class Resource(private[glue] val impl: YarnResource) {
-    override def toString: String = impl.toString
-
-    override def equals(other: Any): Boolean = {
-      if (other.isInstanceOf[Resource]) {
-        impl.equals(other.asInstanceOf[Resource].impl)
-      } else {
-        false
-      }
-    }
-
-    override def hashCode(): Int = {
-      impl.hashCode()
-    }
-  }
-
-  object Resource {
-    def newInstance(memory: Int, vCores: Int): Resource = {
-      YarnResource.newInstance(memory, vCores)
-    }
-  }
-
-  class Container(private[glue] val impl: YarnContainer) {
-    def getId: ContainerId = impl.getId
-
-    def getNodeHttpAddress: String = impl.getNodeHttpAddress
-
-    def getNodeId: NodeId = impl.getNodeId
-
-    override def toString: String = impl.toString
-
-    override def equals(other: Any): Boolean = {
-      if (other.isInstanceOf[Container]) {
-        impl.equals(other.asInstanceOf[Container].impl)
-      } else {
-        false
-      }
-    }
-
-    override def hashCode(): Int = {
-      impl.hashCode()
-    }
-  }
-
-  class ContainerId(private[glue] val impl: YarnContainerId) {
-    override def toString: String = impl.toString
-
-    override def equals(other: Any): Boolean = {
-      if (other.isInstanceOf[ContainerId]) {
-        impl.equals(other.asInstanceOf[ContainerId].impl)
-      } else {
-        false
-      }
-    }
-
-    override def hashCode(): Int = {
-      impl.hashCode()
-    }
-  }
-
-  object ContainerId {
-    def fromString(worker: String): ContainerId = YarnContainerId.fromString(worker)
-  }
-
-  class NodeId(private[glue] val impl: YarnNodeId) {
-    def getHost: String = impl.getHost
-
-    override def toString: String = impl.toString
-
-    override def equals(other: Any): Boolean = {
-      if (other.isInstanceOf[NodeId]) {
-        impl.equals(other.asInstanceOf[NodeId].impl)
-      } else {
-        false
-      }
-    }
-
-    override def hashCode(): Int = {
-      impl.hashCode()
-    }
-  }
-
-  class ContainerStatus(private[glue] val impl: YarnContainerStatus) {
-    def getDiagnostics: String = impl.getDiagnostics
-
-    def getContainerId: ContainerId = impl.getContainerId
-
-    def getExitStatus: Int = impl.getExitStatus
-
-    override def toString: String = impl.toString
-  }
-
-  private[glue] implicit def yarnResourceToResource(res: YarnResource): Resource = {
-    new Resource(res)
-  }
-
-  private[glue] implicit def resourceToYarnResource(res: Resource): YarnResource = {
-    res.impl
-  }
-
-  private[glue] implicit def yarnAppIdToAppId(yarn: YarnApplicationId): ApplicationId = {
-    new ApplicationId(yarn)
-  }
-
-  private[glue] implicit def appIdToYarnAppId(app: ApplicationId): YarnApplicationId = {
-    app.impl
-  }
-
-  private[glue] implicit def yarnReportToReport(yarn: YarnApplicationReport): ApplicationReport = {
-    new ApplicationReport(yarn)
-  }
-
-  private[glue] implicit def reportToYarnReport(app: ApplicationReport): YarnApplicationReport = {
-    app.impl
-  }
-
-  private[glue] implicit def yarnContainerToContainer(yarn: YarnContainer): Container = {
-    new Container(yarn)
-  }
-
-  private[glue] implicit def containerToYarnContainer(app: Container): YarnContainer = {
-    app.impl
-  }
-
-  private[glue] implicit def yarnContainerStatusToContainerStatus(yarn: YarnContainerStatus)
-    : ContainerStatus = {
-    new ContainerStatus(yarn)
-  }
-
-  private[glue] implicit def containerStatusToYarnContainerStatus(app: ContainerStatus)
-    : YarnContainerStatus = {
-    app.impl
-  }
-
-  private[glue] implicit def containerIdToYarnContainerId(app: ContainerId): YarnContainerId = {
-    app.impl
-  }
-
-  private[glue] implicit def yarnContainerIdToContainerId(yarn: YarnContainerId): ContainerId = {
-    new ContainerId(yarn)
-  }
-
-  private[glue] implicit def nodeIdToYarnNodeId(app: NodeId): YarnNodeId = {
-    app.impl
-  }
-
-  private[glue] implicit def yarnNodeIdToNodeId(yarn: YarnNodeId): NodeId = {
-    new NodeId(yarn)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
deleted file mode 100644
index db7d5d7..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import org.apache.hadoop.yarn.api.records.YarnApplicationState
-import org.apache.hadoop.yarn.client.api
-
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
-
-/**
- * Adapter for api.YarnClient
- */
-class YarnClient(yarn: YarnConfig) {
-
-  val LOG = LogUtil.getLogger(getClass)
-
-  private val client: api.YarnClient = api.YarnClient.createYarnClient
-  client.init(yarn.conf)
-  client.start()
-  LOG.info("Starting YarnClient...")
-
-  def createApplication: ApplicationId = {
-    val app = client.createApplication()
-    val response = app.getNewApplicationResponse()
-    LOG.info("Create application, appId: " + response.getApplicationId())
-    response.getApplicationId()
-  }
-
-  def getApplicationReport(appId: ApplicationId): ApplicationReport = {
-    client.getApplicationReport(appId)
-  }
-
-  def submit(
-      name: String, appId: ApplicationId, command: String, resource: Resource, queue: String,
-      packagePath: String, configPath: String): ApplicationId = {
-
-    val appContext = Records.newAppSubmissionContext
-    appContext.setApplicationName(name)
-    appContext.setApplicationId(appId)
-
-    val containerContext = ContainerLaunchContext(yarn.conf, command, packagePath, configPath)
-    appContext.setAMContainerSpec(containerContext)
-    appContext.setResource(resource)
-    appContext.setQueue(queue)
-
-    LOG.info(s"Submit Application $appId to YARN...")
-    client.submitApplication(appContext)
-  }
-
-  def awaitApplication(appId: ApplicationId, timeoutMilliseconds: Long = Long.MaxValue)
-    : ApplicationReport = {
-    import org.apache.hadoop.yarn.api.records.YarnApplicationState._
-    val terminated = Set(FINISHED, KILLED, FAILED, RUNNING)
-    var result: ApplicationReport = null
-    var done = false
-
-    val start = System.currentTimeMillis()
-    def timeout: Boolean = {
-      val now = System.currentTimeMillis()
-      if (now - start > timeoutMilliseconds) {
-        true
-      } else {
-        false
-      }
-    }
-
-    while (!done && !timeout) {
-      val report = client.getApplicationReport(appId)
-      val status = report.getYarnApplicationState
-      if (terminated.contains(status)) {
-        done = true
-        result = report
-      } else {
-        Console.print(".")
-        Thread.sleep(1000)
-      }
-    }
-
-    if (timeout) {
-      throw new Exception(s"Launch Application $appId timeout...")
-    }
-    result
-  }
-
-  def stop(): Unit = {
-    client.stop()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
deleted file mode 100644
index 87f199a..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
-/**
- * wrapper for yarn configuration
- */
-case class YarnConfig(conf: YarnConfiguration = new YarnConfiguration(new Configuration(true))) {
-  def writeXml(out: java.io.Writer): Unit = conf.writeXml(out)
-
-  def resourceManager: String = {
-    conf.get("yarn.resourcemanager.hostname")
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala
deleted file mode 100644
index 24adbaa..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn
-
-
-/**
- * YARN facade to decouple Gearpump with YARN.
- */
-package object glue {
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
new file mode 100644
index 0000000..33c3e97
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn
+
+object Constants {
+  val APPMASTER_NAME = "gearpump.yarn.applicationmaster.name"
+  val APPMASTER_COMMAND = "gearpump.yarn.applicationmaster.command"
+  val APPMASTER_MEMORY = "gearpump.yarn.applicationmaster.memory"
+  val APPMASTER_VCORES = "gearpump.yarn.applicationmaster.vcores"
+  val APPMASTER_QUEUE = "gearpump.yarn.applicationmaster.queue"
+
+  val PACKAGE_PATH = "gearpump.yarn.client.package-path"
+  val CONFIG_PATH = "gearpump.yarn.client.config-path"
+
+  val MASTER_COMMAND = "gearpump.yarn.master.command"
+  val MASTER_MEMORY = "gearpump.yarn.master.memory"
+  val MASTER_VCORES = "gearpump.yarn.master.vcores"
+
+  val WORKER_COMMAND = "gearpump.yarn.worker.command"
+  val WORKER_CONTAINERS = "gearpump.yarn.worker.containers"
+  val WORKER_MEMORY = "gearpump.yarn.worker.memory"
+  val WORKER_VCORES = "gearpump.yarn.worker.vcores"
+
+  val SERVICES_ENABLED = "gearpump.yarn.services.enabled"
+
+  val LOCAL_DIRS = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.LOCAL_DIRS.$$()
+  val CONTAINER_ID = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CONTAINER_ID.$$()
+  val LOG_DIR_EXPANSION_VAR = org.apache.hadoop.yarn.api.ApplicationConstants.LOG_DIR_EXPANSION_VAR
+  val NODEMANAGER_HOST = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_HOST.$$()
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
new file mode 100644
index 0000000..711506a
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.cluster.main.{Master, Worker}
+import org.apache.gearpump.experiments.yarn.Constants._
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants
+
+/** Command to start a YARN container */
+trait Command {
+  def get: String
+  override def toString: String = get
+}
+
+abstract class AbstractCommand extends Command {
+  protected def config: Config
+  def version: String
+  def classPath: Array[String] = {
+    Array(
+      s"conf",
+      s"pack/$version/conf",
+      s"pack/$version/lib/daemon/*",
+      s"pack/$version/lib/*"
+    )
+  }
+
+  protected def buildCommand(
+      java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String])
+    : String = {
+    val exe = config.getString(java)
+
+    s"$exe -cp ${classPath.mkString(":")}:" +
+      "$CLASSPATH " + properties.mkString(" ") +
+      s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr"
+  }
+
+  protected def clazz(any: AnyRef): String = {
+    val name = any.getClass.getName
+    if (name.endsWith("$")) {
+      name.dropRight(1)
+    } else {
+      name
+    }
+  }
+}
+
+case class MasterCommand(config: Config, version: String, masterAddr: HostPort)
+  extends AbstractCommand {
+
+  def get: String = {
+    val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}")
+
+    val properties = Array(
+      s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
+      s"-D${Constants.GEARPUMP_HOSTNAME}=${masterAddr.host}",
+      s"-D${Constants.GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}",
+      s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
+      s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
+      s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}")
+
+    buildCommand(MASTER_COMMAND, properties, clazz(Master), masterArguments)
+  }
+}
+
+case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String)
+  extends AbstractCommand {
+
+  def get: String = {
+    val properties = Array(
+      s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
+      s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
+      s"-D${Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}",
+      s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
+      s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
+      s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost")
+
+    buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String])
+  }
+}
+
+case class AppMasterCommand(config: Config, version: String, args: Array[String])
+  extends AbstractCommand {
+
+  override val classPath = Array(
+    "conf",
+    s"pack/$version/conf",
+    s"pack/$version/dashboard",
+    s"pack/$version/lib/*",
+    s"pack/$version/lib/daemon/*",
+    s"pack/$version/lib/services/*",
+    s"pack/$version/lib/yarn/*"
+  )
+
+  def get: String = {
+    val properties = Array(
+      s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
+      s"-D${Constants.GEARPUMP_FULL_SCALA_VERSION}=$version",
+      s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
+      s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
+      s"-D${Constants.GEARPUMP_HOSTNAME}=${NODEMANAGER_HOST}")
+
+    val arguments = Array(s"") ++ args
+
+    buildCommand(APPMASTER_COMMAND, properties, clazz(YarnAppMaster),
+      arguments)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala
new file mode 100644
index 0000000..1194f0b
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import akka.actor._
+import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.services.main.Services
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil}
+
+import scala.concurrent.Future
+
+trait UIFactory {
+  def props(masters: List[HostPort], host: String, port: Int): Props
+}
+
+/** Wrapper of UI server */
+class UIService(masters: List[HostPort], host: String, port: Int) extends Actor {
+  private val LOG = LogUtil.getLogger(getClass)
+
+  private val supervisor = ActorUtil.getFullPath(context.system, context.parent.path)
+  private var configFile: java.io.File = null
+
+  private implicit val dispatcher = context.dispatcher
+
+  override def postStop(): Unit = {
+    if (configFile != null) {
+      configFile.delete()
+      configFile = null
+    }
+
+    // TODO: fix this
+    // Hack around to Kill the UI server
+    Services.kill()
+  }
+
+  override def preStart(): Unit = {
+    Future(start())
+  }
+
+  def start(): Unit = {
+    val mastersArg = masters.mkString(",")
+    LOG.info(s"Launching services -master $mastersArg")
+
+    configFile = java.io.File.createTempFile("uiserver", ".conf")
+
+    val config = context.system.settings.config.
+      withValue(Constants.GEARPUMP_SERVICE_HOST, ConfigValueFactory.fromAnyRef(host)).
+      withValue(Constants.GEARPUMP_SERVICE_HTTP, ConfigValueFactory.fromAnyRef(port.toString)).
+      withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(host))
+
+    ClusterConfig.saveConfig(config, configFile)
+
+    val master = masters.head
+
+    ConfigFactory.invalidateCaches()
+    launch(supervisor, master.host, master.port, configFile.toString)
+  }
+
+  // Launch the UI server
+  def launch(supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = {
+    Services.main(Array("-supervisor", supervisor, "-master", s"$masterHost:$masterPort"
+      , "-conf", configFile))
+  }
+
+  override def receive: Receive = {
+    case _ =>
+      LOG.error(s"Unknown message received")
+  }
+}
+
+object UIService extends UIFactory {
+  override def props(masters: List[HostPort], host: String, port: Int): Props = {
+    Props(new UIService(masters, host, port))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
new file mode 100644
index 0000000..97577fb
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import java.io.IOException
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import org.apache.commons.httpclient.HttpClient
+import org.apache.commons.httpclient.methods.GetMethod
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.experiments.yarn.Constants._
+import org.apache.gearpump.experiments.yarn.glue.Records._
+import org.apache.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util._
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+/**
+ * YARN AppMaster. YARN AppMaster is responsible to start Gearpump masters, workers, UI server as
+ * YARN containers.
+ *
+ * NOTE: It is different with Gearpump AppMaster. Gearpump AppMaster is a sub-process of worker.
+ */
+class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
+    packagePath: String, hdfsConfDir: String,
+    uiFactory: UIFactory)
+  extends Actor {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private val akkaConf = context.system.settings.config
+  private val servicesEnabled = akkaConf.getString(SERVICES_ENABLED).toBoolean
+  private var uiStarted = false
+  private val host = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
+
+  private val port = Util.findFreePort().get
+
+  private val trackingURL = "http://" + host + ":" + port
+
+  // TODO: for now, only one master is supported.
+  private val masterCount = 1
+  private val masterMemory = akkaConf.getString(MASTER_MEMORY).toInt
+  private val masterVCores = akkaConf.getString(MASTER_VCORES).toInt
+
+  private var workerCount = akkaConf.getString(WORKER_CONTAINERS).toInt
+  private val workerMemory = akkaConf.getString(WORKER_MEMORY).toInt
+  private val workerVCores = akkaConf.getString(WORKER_VCORES).toInt
+
+  val rootPath = System.getProperty(Constants.GEARPUMP_FULL_SCALA_VERSION)
+
+  rmClient.start(self)
+  nmClient.start(self)
+
+  def receive: Receive = null
+
+  private def registerAppMaster(): Unit = {
+    val target = host + ":" + port
+    rmClient.registerAppMaster(host, port, trackingURL)
+  }
+
+  registerAppMaster
+  context.become(waitForAppMasterRegistered)
+
+  import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster._
+
+  def waitForAppMasterRegistered: Receive = {
+    case AppMasterRegistered =>
+      LOG.info("YarnAppMaster registration completed")
+      requestMasterContainers(masterCount)
+      context.become(startingMasters(remain = masterCount, List.empty[MasterInfo]))
+  }
+
+  private def startingMasters(remain: Int, masters: List[MasterInfo]): Receive = box {
+    case ContainersAllocated(containers) =>
+      LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: "
+        + containers.size)
+      val count = Math.min(containers.length, remain)
+      val newMasters = (0 until count).toList.map { index =>
+        val container = containers(index)
+        MasterInfo(container.getId, container.getNodeId, launchMaster(container))
+      }
+
+      // Stops un-used containers
+      containers.drop(count).map { container =>
+        nmClient.stopContainer(container.getId, container.getNodeId)
+      }
+
+      context.become(startingMasters(remain, newMasters ++ masters))
+    case ContainerStarted(containerId) =>
+      LOG.info(s"ContainerStarted: container ${containerId} started for master(remain=$remain) ")
+      if (remain > 1) {
+        context.become(startingMasters(remain - 1, masters))
+      } else {
+        requestWorkerContainers(workerCount)
+        context.become(startingWorkers(workerCount, masters, List.empty[WorkerInfo]))
+      }
+  }
+
+  private def box(receive: Receive): Receive = {
+    onError orElse receive orElse unHandled
+  }
+
+  private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo])
+  : Receive = {
+    box {
+      case ContainersAllocated(containers) =>
+        LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), " +
+          s"count: " + containers.size)
+
+        val count = Math.min(containers.length, remain)
+        val newWorkers = (0 until count).toList.map { index =>
+          val container = containers(index)
+          launchWorker(container, masters)
+          WorkerInfo(container.getId, container.getNodeId)
+        }
+
+        // Stops un-used containers
+        containers.drop(count).map { container =>
+          nmClient.stopContainer(container.getId, container.getNodeId)
+        }
+        context.become(startingWorkers(remain, masters, workers ++ newWorkers))
+      case ContainerStarted(containerId) =>
+        LOG.info(s"ContainerStarted: container $containerId started for worker(remain=$remain)")
+        // The last one
+        if (remain > 1) {
+          context.become(startingWorkers(remain - 1, masters, workers))
+        } else {
+          if (servicesEnabled && !uiStarted) {
+            context.actorOf(uiFactory.props(masters.map(_.host), host, port))
+            uiStarted = true
+          }
+          context.become(service(effectiveConfig(masters.map(_.host)), masters, workers))
+        }
+    }
+  }
+
+  private def effectiveConfig(masters: List[HostPort]): Config = {
+    val masterList = masters.map(pair => s"${pair.host}:${pair.port}")
+    val config = context.system.settings.config
+    config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
+      ConfigValueFactory.fromIterable(masterList.asJava))
+  }
+
+  private def onError: Receive = {
+    case ContainersCompleted(containers) =>
+      // TODO: we should recover the failed container from this...
+      containers.foreach { status =>
+        if (status.getExitStatus != 0) {
+          LOG.error(s"ContainersCompleted: container ${status.getContainerId}" +
+            s" failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}")
+        } else {
+          LOG.info(s"ContainersCompleted: container ${status.getContainerId} completed")
+        }
+      }
+    case ShutdownApplication =>
+      LOG.error("ShutdownApplication")
+      nmClient.stop()
+      rmClient.shutdownApplication()
+      context.stop(self)
+    case ResourceManagerException(ex) =>
+      LOG.error("ResourceManagerException: " + ex.getMessage, ex)
+      nmClient.stop()
+      rmClient.failApplication(ex)
+      context.stop(self)
+    case Kill =>
+      LOG.info("Kill: User asked to shutdown the application")
+      sender ! CommandResult(success = true)
+      self ! ShutdownApplication
+  }
+
+  private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo])
+  : Receive = box {
+    case GetActiveConfig(clientHost) =>
+      LOG.info("GetActiveConfig: Get active configuration for client: " + clientHost)
+      val filtered = ClusterConfig.filterOutDefaultConfig(
+        config.withValue(Constants.GEARPUMP_HOSTNAME,
+          ConfigValueFactory.fromAnyRef(clientHost)))
+      sender ! ActiveConfig(filtered)
+    case QueryVersion =>
+      LOG.info("QueryVersion")
+      sender ! Version(Util.version)
+    case QueryClusterInfo =>
+      LOG.info("QueryClusterInfo")
+      val masterContainers = masters.map { master =>
+        master.id.toString + s"(${master.nodeId.toString})"
+      }
+
+      val workerContainers = workers.map { worker =>
+        worker.id.toString + s"(${worker.nodeId.toString})"
+      }
+      sender ! ClusterInfo(masterContainers, workerContainers)
+    case AddMaster =>
+      sender ! CommandResult(success = false, "Not Implemented")
+    case RemoveMaster(masterId) =>
+      sender ! CommandResult(success = false, "Not Implemented")
+    case AddWorker(count) =>
+      if (count == 0) {
+        sender ! CommandResult(success = true)
+      } else {
+        LOG.info("AddWorker: Start to add new workers, count: " + count)
+        workerCount += count
+        requestWorkerContainers(count)
+        context.become(startingWorkers(count, masters, workers))
+        sender ! CommandResult(success = true)
+      }
+    case RemoveWorker(worker) =>
+      val workerId = ContainerId.fromString(worker)
+      LOG.info(s"RemoveWorker: remove worker $workerId")
+      val info = workers.find(_.id.toString == workerId.toString)
+      if (info.isDefined) {
+        nmClient.stopContainer(info.get.id, info.get.nodeId)
+        sender ! CommandResult(success = true)
+        val remainWorkers = workers.filter(_.id != info.get.id)
+        context.become(service(config, masters, remainWorkers))
+      } else {
+        sender ! CommandResult(success = false, "failed to find worker " + worker)
+      }
+  }
+
+  private def unHandled: Receive = {
+    case other =>
+      LOG.info(s"Received unknown message $other")
+  }
+
+  private def requestMasterContainers(masters: Int) = {
+    LOG.info(s"Request resource for masters($masters)")
+    val containers = (1 to masters).map(
+      i => Resource.newInstance(masterMemory, masterVCores)
+    ).toList
+    rmClient.requestContainers(containers)
+  }
+
+  private def launchMaster(container: Container): HostPort = {
+    LOG.info(s"Launch Master on container " + container.getNodeHttpAddress)
+    val host = container.getNodeId.getHost
+
+    val port = Util.findFreePort().get
+
+    LOG.info("=============PORT" + port)
+    val masterCommand = MasterCommand(akkaConf, rootPath, HostPort(host, port))
+    nmClient.launchCommand(container, masterCommand.get, packagePath, hdfsConfDir)
+    HostPort(host, port)
+  }
+
+  private def requestWorkerContainers(workers: Int): Unit = {
+    LOG.info(s"Request resource for workers($workers)")
+    val containers = (1 to workers).map(
+      i => Resource.newInstance(workerMemory, workerVCores)
+    ).toList
+
+    rmClient.requestContainers(containers)
+  }
+
+  private def launchWorker(container: Container, masters: List[MasterInfo]): Unit = {
+    LOG.info(s"Launch Worker on container " + container.getNodeHttpAddress)
+    val workerHost = container.getNodeId.getHost
+    val workerCommand = WorkerCommand(akkaConf, rootPath, masters.head.host, workerHost)
+    nmClient.launchCommand(container, workerCommand.get, packagePath, hdfsConfDir)
+  }
+}
+
+object YarnAppMaster extends AkkaApp with ArgumentsParser {
+  val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "conf" -> CLIOption[String]("<Gearpump configuration directory on HDFS>", required = true),
+    "package" -> CLIOption[String]("<Gearpump package path on HDFS>", required = true)
+  )
+
+  override def akkaConfig: Config = {
+    ClusterConfig.ui()
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+    implicit val system = ActorSystem("GearpumpAM", akkaConf)
+
+    val yarnConf = new YarnConfig()
+
+    val confDir = parse(args).getString("conf")
+    val packagePath = parse(args).getString("package")
+
+    LOG.info("HADOOP_CONF_DIR: " + System.getenv("HADOOP_CONF_DIR"))
+    LOG.info("YARN Resource Manager: " + yarnConf.resourceManager)
+
+    val rmClient = new RMClient(yarnConf)
+    val nmClient = new NMClient(yarnConf, akkaConf)
+    val appMaster = system.actorOf(Props(new YarnAppMaster(rmClient,
+      nmClient, packagePath, confDir, UIService)))
+
+    val daemon = system.actorOf(Props(new Daemon(appMaster)))
+    Await.result(system.whenTerminated, Duration.Inf)
+    LOG.info("YarnAppMaster is shutdown")
+  }
+
+  class Daemon(appMaster: ActorRef) extends Actor {
+    context.watch(appMaster)
+
+    override def receive: Actor.Receive = {
+      case Terminated(actor) =>
+        if (actor.compareTo(appMaster) == 0) {
+          LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, " +
+            s"shutting down current ActorSystem")
+          context.system.terminate()
+          context.stop(self)
+        }
+    }
+  }
+
+  case class ResourceManagerException(throwable: Throwable)
+  case object ShutdownApplication
+  case class ContainersRequest(containers: List[Resource])
+  case class ContainersAllocated(containers: List[Container])
+  case class ContainersCompleted(containers: List[ContainerStatus])
+  case class ContainerStarted(containerId: ContainerId)
+  case object AppMasterRegistered
+
+  case class GetActiveConfig(clientHost: String)
+
+  case object QueryClusterInfo
+  case class ClusterInfo(masters: List[String], workers: List[String]) {
+    override def toString: String = {
+      val separator = "\n"
+      val masterSection = "masters: " + separator + masters.mkString("\n") + "\n"
+
+      val workerSection = "workers: " + separator + workers.mkString("\n") + "\n"
+      masterSection + workerSection
+    }
+  }
+
+  case object Kill
+  case class ActiveConfig(config: Config)
+
+  case object QueryVersion
+
+  case class Version(version: String)
+
+  case class MasterInfo(id: ContainerId, nodeId: NodeId, host: HostPort)
+
+  case class WorkerInfo(id: ContainerId, nodeId: NodeId)
+
+  def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = {
+    val client = new HttpClient()
+    val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path"
+    val get = new GetMethod(appMasterPath)
+    var status = client.executeMethod(get)
+
+    if (status != 200) {
+      // Sleeps a little bit, and try again
+      Thread.sleep(3000)
+      status = client.executeMethod(get)
+    }
+
+    if (status == 200) {
+      AkkaHelper.actorFor(system, get.getResponseBodyAsString)
+    } else {
+      throw new IOException("Fail to resolve AppMaster address, please make sure " +
+        s"${report.getOriginalTrackingUrl} is accessible...")
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
new file mode 100644
index 0000000..9fb69b2
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.client
+
+import java.io.IOException
+
+import akka.actor.{ActorRef, ActorSystem}
+import org.apache.commons.httpclient.HttpClient
+import org.apache.commons.httpclient.methods.GetMethod
+import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId
+import org.apache.gearpump.experiments.yarn.glue.YarnClient
+import org.apache.gearpump.util.{AkkaHelper, LogUtil}
+
+import scala.util.Try
+
+/**
+ * Resolves AppMaster ActorRef
+ */
+class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
+  val LOG = LogUtil.getLogger(getClass)
+  val RETRY_INTERVAL_MS = 3000 // ms
+
+  def resolve(appId: ApplicationId, timeoutSeconds: Int = 30): ActorRef = {
+    val appMaster = retry(connect(appId), 1 + timeoutSeconds * 1000 / RETRY_INTERVAL_MS)
+    appMaster
+  }
+
+  private def connect(appId: ApplicationId): ActorRef = {
+    val report = yarnClient.getApplicationReport(appId)
+    val client = new HttpClient()
+    val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path"
+    LOG.info(s"appMasterPath=$appMasterPath")
+    val get = new GetMethod(appMasterPath)
+    val status = client.executeMethod(get)
+    if (status == 200) {
+      val response = get.getResponseBodyAsString
+      LOG.info("Successfully resolved AppMaster address: " + response)
+      AkkaHelper.actorFor(system, response)
+    } else {
+      throw new IOException("Fail to resolve AppMaster address, please make sure " +
+        s"${report.getOriginalTrackingUrl} is accessible...")
+    }
+  }
+
+  private def retry(fun: => ActorRef, times: Int): ActorRef = {
+    var index = 0
+    var result: ActorRef = null
+    while (index < times && result == null) {
+      Thread.sleep(RETRY_INTERVAL_MS)
+      index += 1
+      val tryConnect = Try(fun)
+      if (tryConnect.isFailure) {
+        LOG.error(s"Failed to connect YarnAppMaster(tried $index)... " +
+          tryConnect.failed.get.getMessage)
+      } else {
+        result = tryConnect.get
+      }
+    }
+    result
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala
new file mode 100644
index 0000000..9ec2eae
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.yarn.client
+
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+/** Command line tool to launch a Gearpump cluster on YARN, and also to manage Gearpump cluster */
+object Client {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  val LAUNCH = "launch"
+
+  val commands = Map(LAUNCH -> LaunchCluster) ++
+    ManageCluster.commands.map(key => (key, ManageCluster)).toMap
+
+  def usage(): Unit = {
+    val keys = commands.keys.toList.sorted
+    // scalastyle:off println
+    Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+    // scalastyle:on println
+  }
+
+  def main(args: Array[String]): Unit = {
+    if (args.length == 0) {
+      usage()
+    } else {
+      val key = args(0)
+      val command = commands.get(key)
+      command match {
+        case Some(command) =>
+          if (key == LAUNCH) {
+            val remainArgs = args.drop(1)
+            command.main(remainArgs)
+          } else {
+            val commandArg = Array("-" + ManageCluster.COMMAND, key)
+            val remainArgs = args.drop(1)
+            val updatedArgs = commandArg ++ args.drop(1)
+            command.main(updatedArgs)
+          }
+        case None =>
+          usage
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
new file mode 100644
index 0000000..2475728
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.yarn.client
+
+import java.io.{File, IOException, OutputStreamWriter}
+import java.net.InetAddress
+import java.util.zip.ZipInputStream
+
+import akka.actor.ActorSystem
+import com.typesafe.config.{Config, ConfigValueFactory}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.experiments.yarn.Constants
+import org.apache.gearpump.experiments.yarn.appmaster.AppMasterCommand
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
+import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource}
+import org.apache.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
+import org.apache.gearpump.util.ActorUtil.askActor
+import org.apache.gearpump.util.{AkkaApp, LogUtil, Util}
+import org.slf4j.Logger
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+/**
+ * Launch Gearpump on YARN
+ */
+class LaunchCluster(
+    akka: Config,
+    yarnConf: YarnConfig,
+    yarnClient: YarnClient,
+    fs: FileSystem,
+    actorSystem: ActorSystem,
+    appMasterResolver: AppMasterResolver,
+    version: String = Util.version) {
+
+  import org.apache.gearpump.experiments.yarn.Constants._
+  private implicit val dispatcher = actorSystem.dispatcher
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private val host = InetAddress.getLocalHost.getHostName
+  private val queue = akka.getString(APPMASTER_QUEUE)
+  private val memory = akka.getString(APPMASTER_MEMORY).toInt
+  private val vcore = akka.getString(APPMASTER_VCORES).toInt
+
+  def submit(appName: String, packagePath: String): ApplicationId = {
+    LOG.info("Starting AM")
+
+    // First step, check the version, to make sure local version matches remote version
+    if (!packagePath.endsWith(".zip")) {
+      throw new IOException(s"YarnClient only support .zip distribution package," +
+        s" now it is ${packagePath}. Please download the zip " +
+        "package from website or use sbt assembly packArchiveZip to build one.")
+    }
+
+    if (!fs.exists(packagePath)) {
+      throw new IOException(s"Cannot find package ${packagePath} on HDFS ${fs.name}. ")
+    }
+
+    val rootEntry = rootEntryPath(zip = packagePath)
+
+    if (!rootEntry.contains(version)) {
+      throw new IOException(s"Check version failed! Local gearpump binary" +
+        s" version $version doesn't match with remote path $packagePath")
+    }
+
+    val resource = Resource.newInstance(memory, vcore)
+    val appId = yarnClient.createApplication
+
+    // uploads the configs to HDFS home directory of current user.
+    val configPath = uploadConfigToHDFS(appId)
+
+    val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath",
+      s"-package $packagePath"))
+
+    yarnClient.submit(appName, appId, command.get, resource, queue, packagePath, configPath)
+
+    LOG.info("Waiting application to finish...")
+    val report = yarnClient.awaitApplication(appId, LaunchCluster.TIMEOUT_MILLISECONDS)
+    LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} " +
+      s"at ${report.getFinishTime}, info: ${report.getDiagnostics}")
+
+    // scalastyle:off println
+    Console.println("================================================")
+    Console.println("==Application Id: " + appId)
+    // scalastyle:on println
+    appId
+  }
+
+  def saveConfig(appId: ApplicationId, output: String): Future[File] = {
+    LOG.info(s"Trying to download active configuration to output path: " + output)
+    LOG.info(s"Resolving YarnAppMaster ActorRef for application " + appId)
+    val appMaster = appMasterResolver.resolve(appId)
+    LOG.info(s"appMaster=${appMaster.path} host=$host")
+    val future = askActor[ActiveConfig](appMaster, GetActiveConfig(host)).map(_.config)
+    future.map { config =>
+      val out = new File(output)
+      ClusterConfig.saveConfig(config, out)
+      out
+    }
+  }
+
+  private def uploadConfigToHDFS(appId: ApplicationId): String = {
+    // Uses personal home directory so that it will not conflict with other users
+    // conf path pattern: /user/<userid>/.gearpump_application_<timestamp>_<id>/conf
+    val confDir = s"${fs.getHomeDirectory}/.gearpump_${appId}/conf/"
+    LOG.info(s"Uploading configuration files to remote HDFS(under $confDir)...")
+
+    // Copies config from local to remote.
+    val remoteConfFile = s"$confDir/gear.conf"
+    var out = fs.create(remoteConfFile)
+    var writer = new OutputStreamWriter(out)
+
+    val cleanedConfig = ClusterConfig.filterOutDefaultConfig(akka)
+
+    writer.write(cleanedConfig.root().render())
+    writer.close()
+
+    // Saves yarn-site.xml to remote
+    val yarn_site_xml = s"$confDir/yarn-site.xml"
+    out = fs.create(yarn_site_xml)
+    writer = new OutputStreamWriter(out)
+    yarnConf.writeXml(writer)
+    writer.close()
+
+    // Saves log4j.properties to remote
+    val log4j_properties = s"$confDir/log4j.properties"
+    val log4j = LogUtil.loadConfiguration
+    out = fs.create(log4j_properties)
+    writer = new OutputStreamWriter(out)
+    log4j.store(writer, "gearpump on yarn")
+    writer.close()
+    confDir.toString
+  }
+
+  private def rootEntryPath(zip: String): String = {
+    val stream = new ZipInputStream(fs.open(zip))
+    val entry = stream.getNextEntry()
+    val name = entry.getName
+    name.substring(0, entry.getName.indexOf("/"))
+  }
+}
+
+object LaunchCluster extends AkkaApp with ArgumentsParser {
+
+  val PACKAGE = "package"
+  val NAME = "name"
+  val VERBOSE = "verbose"
+  val OUTPUT = "output"
+
+  override protected def akkaConfig: Config = {
+    ClusterConfig.default()
+  }
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. " +
+      "If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false),
+    NAME -> CLIOption[String]("<Application name showed in YARN>", required = false,
+      defaultValue = Some("Gearpump")),
+    VERBOSE -> CLIOption("<print verbose log on console>", required = false,
+      defaultValue = Some(false)),
+    OUTPUT -> CLIOption("<output path for configuration file>", required = false,
+      defaultValue = None)
+  )
+  private val TIMEOUT_MILLISECONDS = 30 * 1000
+
+  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+    val parsed = parse(args)
+    if (parsed.getBoolean(VERBOSE)) {
+      LogUtil.verboseLogToConsole()
+    }
+
+    val yarnConfig = new YarnConfig()
+    val fs = new FileSystem(yarnConfig)
+    val yarnClient = new YarnClient(yarnConfig)
+    val akkaConf = updateConf(inputAkkaConf, parsed)
+    val actorSystem = ActorSystem("launchCluster", akkaConf)
+    val appMasterResolver = new AppMasterResolver(yarnClient, actorSystem)
+
+    val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs,
+      actorSystem, appMasterResolver)
+
+    val name = parsed.getString(NAME)
+    val appId = client.submit(name, akkaConf.getString(Constants.PACKAGE_PATH))
+
+    if (parsed.exists(OUTPUT)) {
+      import scala.concurrent.duration._
+      Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)),
+        TIMEOUT_MILLISECONDS.milliseconds)
+    }
+
+    yarnClient.stop()
+    actorSystem.terminate()
+    Await.result(actorSystem.whenTerminated, Duration.Inf)
+  }
+
+  private def updateConf(akka: Config, parsed: ParseResult): Config = {
+    if (parsed.exists(PACKAGE)) {
+      akka.withValue(Constants.PACKAGE_PATH,
+        ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE)))
+    } else {
+      akka
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala
new file mode 100644
index 0000000..5b12346
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.client
+
+import java.io.{File, IOException}
+import java.net.InetAddress
+import scala.concurrent.{Await, Future}
+
+import akka.actor.{ActorRef, ActorSystem}
+
+import org.apache.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version}
+import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId
+import org.apache.gearpump.experiments.yarn.glue.{YarnClient, YarnConfig}
+import org.apache.gearpump.util.ActorUtil.askActor
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Manage current Gearpump cluster on YARN */
+class ManageCluster(appId: ApplicationId, appMaster: ActorRef, system: ActorSystem) {
+  import org.apache.gearpump.experiments.yarn.client.ManageCluster._
+
+  private val host = InetAddress.getLocalHost.getHostName
+  implicit val dispatcher = system.dispatcher
+
+  def getConfig: Future[ActiveConfig] = askActor[ActiveConfig](appMaster, GetActiveConfig(host))
+  def version: Future[Version] = askActor[Version](appMaster, QueryVersion)
+  def addWorker(count: Int): Future[CommandResult] = {
+    askActor[CommandResult](appMaster, AddWorker(count))
+  }
+
+  def removeWorker(worker: String): Future[CommandResult] = {
+    askActor[CommandResult](appMaster, RemoveWorker(worker))
+  }
+
+  def shutdown: Future[CommandResult] = askActor[CommandResult](appMaster, Kill)
+  def queryClusterInfo: Future[ClusterInfo] = askActor[ClusterInfo](appMaster, QueryClusterInfo)
+
+  def command(command: String, parsed: ParseResult): Future[AnyRef] = {
+    command match {
+      case GET_CONFIG =>
+        if (parsed.exists(OUTPUT)) {
+          getConfig.map { conf =>
+            ClusterConfig.saveConfig(conf.config, new File(parsed.getString(OUTPUT)))
+            conf
+          }
+        } else {
+          throw new IOException(s"Please specify -$OUTPUT option")
+        }
+      case ADD_WORKER =>
+        val count = parsed.getString(COUNT).toInt
+        addWorker(count)
+      case REMOVE_WORKER =>
+        val containerId = parsed.getString(CONTAINER)
+        if (containerId == null || containerId.isEmpty) {
+          throw new IOException(s"Please specify -$CONTAINER option")
+        } else {
+          removeWorker(containerId)
+        }
+      case KILL =>
+        shutdown
+      case QUERY =>
+        queryClusterInfo
+      case VERSION =>
+        version
+    }
+  }
+}
+
+object ManageCluster extends AkkaApp with ArgumentsParser {
+  val GET_CONFIG = "getconfig"
+  val ADD_WORKER = "addworker"
+  val REMOVE_WORKER = "removeworker"
+  val KILL = "kill"
+  val VERSION = "version"
+  val QUERY = "query"
+  val COMMAND = "command"
+  val CONTAINER = "container"
+  val OUTPUT = "output"
+  val COUNT = "count"
+  val APPID = "appid"
+  val VERBOSE = "verbose"
+
+  val commands = List(GET_CONFIG, ADD_WORKER, REMOVE_WORKER, KILL, VERSION, QUERY)
+
+  import scala.concurrent.duration._
+  val TIME_OUT_SECONDS = 30.seconds
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    COMMAND -> CLIOption[String](s"<${commands.mkString("|")}>", required = true),
+    APPID -> CLIOption[String]("<Application id, format: application_timestamp_id>",
+    required = true),
+    COUNT -> CLIOption("<how many instance to add or remove>", required = false,
+      defaultValue = Some(1)),
+    VERBOSE -> CLIOption("<print verbose log on console>", required = false,
+      defaultValue = Some(false)),
+    OUTPUT -> CLIOption("<output path for configuration file>", required = false,
+      defaultValue = Some("")),
+    CONTAINER -> CLIOption("<container id for master or worker>", required = false,
+      defaultValue = Some(""))
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    val yarnConfig = new YarnConfig()
+    val yarnClient = new YarnClient(yarnConfig)
+
+    val parsed = parse(args)
+
+    if (parsed.getBoolean(VERBOSE)) {
+      LogUtil.verboseLogToConsole()
+    }
+
+    val appId = parseAppId(parsed.getString(APPID))
+    val system = ActorSystem("manageCluster", akkaConf)
+
+    val appMasterResolver = new AppMasterResolver(yarnClient, system)
+    val appMaster = appMasterResolver.resolve(appId)
+
+    implicit val dispatcher = system.dispatcher
+    val manager = new ManageCluster(appId, appMaster, system)
+
+    val command = parsed.getString(COMMAND)
+    val result = manager.command(command, parsed)
+
+    // scalastyle:off println
+    Console.println(Await.result(result, TIME_OUT_SECONDS))
+    // scalastyle:on println
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  def parseAppId(str: String): ApplicationId = {
+    val parts = str.split("_")
+    val timestamp = parts(1).toLong
+    val id = parts(2).toInt
+    ApplicationId.newInstance(timestamp, id)
+  }
+}
\ No newline at end of file