You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:32 UTC
[22/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
deleted file mode 100644
index 3e50abe..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.client
-
-import java.io.{File, IOException}
-import java.net.InetAddress
-import scala.concurrent.{Await, Future}
-
-import akka.actor.{ActorRef, ActorSystem}
-
-import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version}
-import io.gearpump.experiments.yarn.glue.Records.ApplicationId
-import io.gearpump.experiments.yarn.glue.{YarnClient, YarnConfig}
-import io.gearpump.util.ActorUtil.askActor
-import io.gearpump.util.{AkkaApp, LogUtil}
-
-/** Manage current Gearpump cluster on YARN */
-class ManageCluster(appId: ApplicationId, appMaster: ActorRef, system: ActorSystem) {
- import io.gearpump.experiments.yarn.client.ManageCluster._
-
- private val host = InetAddress.getLocalHost.getHostName
- implicit val dispatcher = system.dispatcher
-
- def getConfig: Future[ActiveConfig] = askActor[ActiveConfig](appMaster, GetActiveConfig(host))
- def version: Future[Version] = askActor[Version](appMaster, QueryVersion)
- def addWorker(count: Int): Future[CommandResult] = {
- askActor[CommandResult](appMaster, AddWorker(count))
- }
-
- def removeWorker(worker: String): Future[CommandResult] = {
- askActor[CommandResult](appMaster, RemoveWorker(worker))
- }
-
- def shutdown: Future[CommandResult] = askActor[CommandResult](appMaster, Kill)
- def queryClusterInfo: Future[ClusterInfo] = askActor[ClusterInfo](appMaster, QueryClusterInfo)
-
- def command(command: String, parsed: ParseResult): Future[AnyRef] = {
- command match {
- case GET_CONFIG =>
- if (parsed.exists(OUTPUT)) {
- getConfig.map { conf =>
- ClusterConfig.saveConfig(conf.config, new File(parsed.getString(OUTPUT)))
- conf
- }
- } else {
- throw new IOException(s"Please specify -$OUTPUT option")
- }
- case ADD_WORKER =>
- val count = parsed.getString(COUNT).toInt
- addWorker(count)
- case REMOVE_WORKER =>
- val containerId = parsed.getString(CONTAINER)
- if (containerId == null || containerId.isEmpty) {
- throw new IOException(s"Please specify -$CONTAINER option")
- } else {
- removeWorker(containerId)
- }
- case KILL =>
- shutdown
- case QUERY =>
- queryClusterInfo
- case VERSION =>
- version
- }
- }
-}
-
-object ManageCluster extends AkkaApp with ArgumentsParser {
- val GET_CONFIG = "getconfig"
- val ADD_WORKER = "addworker"
- val REMOVE_WORKER = "removeworker"
- val KILL = "kill"
- val VERSION = "version"
- val QUERY = "query"
- val COMMAND = "command"
- val CONTAINER = "container"
- val OUTPUT = "output"
- val COUNT = "count"
- val APPID = "appid"
- val VERBOSE = "verbose"
-
- val commands = List(GET_CONFIG, ADD_WORKER, REMOVE_WORKER, KILL, VERSION, QUERY)
-
- import scala.concurrent.duration._
- val TIME_OUT_SECONDS = 30.seconds
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- COMMAND -> CLIOption[String](s"<${commands.mkString("|")}>", required = true),
- APPID -> CLIOption[String]("<Application id, format: application_timestamp_id>",
- required = true),
- COUNT -> CLIOption("<how many instance to add or remove>", required = false,
- defaultValue = Some(1)),
- VERBOSE -> CLIOption("<print verbose log on console>", required = false,
- defaultValue = Some(false)),
- OUTPUT -> CLIOption("<output path for configuration file>", required = false,
- defaultValue = Some("")),
- CONTAINER -> CLIOption("<container id for master or worker>", required = false,
- defaultValue = Some(""))
- )
-
- override def main(akkaConf: Config, args: Array[String]): Unit = {
-
- val yarnConfig = new YarnConfig()
- val yarnClient = new YarnClient(yarnConfig)
-
- val parsed = parse(args)
-
- if (parsed.getBoolean(VERBOSE)) {
- LogUtil.verboseLogToConsole()
- }
-
- val appId = parseAppId(parsed.getString(APPID))
- val system = ActorSystem("manageCluster", akkaConf)
-
- val appMasterResolver = new AppMasterResolver(yarnClient, system)
- val appMaster = appMasterResolver.resolve(appId)
-
- implicit val dispatcher = system.dispatcher
- val manager = new ManageCluster(appId, appMaster, system)
-
- val command = parsed.getString(COMMAND)
- val result = manager.command(command, parsed)
-
- // scalastyle:off println
- Console.println(Await.result(result, TIME_OUT_SECONDS))
- // scalastyle:on println
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- def parseAppId(str: String): ApplicationId = {
- val parts = str.split("_")
- val timestamp = parts(1).toLong
- val id = parts(2).toInt
- ApplicationId.newInstance(timestamp, id)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
deleted file mode 100644
index 6b3385f..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import java.io.File
-import java.nio.ByteBuffer
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.fs.{FileSystem => YarnFileSystem, Path}
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.mapreduce.security.TokenCache
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.slf4j.Logger
-
-import io.gearpump.util.LogUtil
-
-private[glue]
-object ContainerLaunchContext {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String)
- : ContainerLaunchContext = {
- val context = Records.newRecord(classOf[ContainerLaunchContext])
- context.setCommands(Seq(command).asJava)
- context.setEnvironment(getAppEnv(yarnConf).asJava)
- context.setTokens(getToken(yarnConf, packagePath, configPath))
- context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath).asJava)
- context
- }
-
- private def getFs(yarnConf: YarnConfiguration) = YarnFileSystem.get(yarnConf)
-
- private def getAppEnv(yarnConf: YarnConfiguration): Map[String, String] = {
- val classPaths = yarnConf.getStrings(
- YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.mkString(File.pathSeparator))
- val allPaths = Option(classPaths).getOrElse(Array(""))
-
- allPaths :+ Environment.PWD.$() + File.separator + "*" + File.pathSeparator
-
- Map(Environment.CLASSPATH.name -> allPaths.map(_.trim).mkString(File.pathSeparator))
- }
-
- private def getAMLocalResourcesMap(
- yarnConf: YarnConfiguration, packagePath: String, configPath: String)
- : Map[String, LocalResource] = {
- val fs = getFs(yarnConf)
-
- Map(
- "pack" -> newYarnAppResource(fs, new Path(packagePath),
- LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION),
- "conf" -> newYarnAppResource(fs, new Path(configPath),
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION))
- }
-
- private def newYarnAppResource(
- fs: YarnFileSystem, path: Path,
- resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = {
- val qualified = fs.makeQualified(path)
- val status = fs.getFileStatus(qualified)
- val resource = Records.newRecord(classOf[LocalResource])
- resource.setType(resourceType)
- resource.setVisibility(vis)
- resource.setResource(ConverterUtils.getYarnUrlFromPath(qualified))
- resource.setTimestamp(status.getModificationTime)
- resource.setSize(status.getLen)
- resource
- }
-
- private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String)
- : ByteBuffer = {
- val credentials = UserGroupInformation.getCurrentUser.getCredentials
- val dob = new DataOutputBuffer
- val dirs = Array(new Path(packagePath), new Path(configPath))
- TokenCache.obtainTokensForNamenodes(credentials, dirs, yc)
- credentials.writeTokenStorageToStream(dob)
- ByteBuffer.wrap(dob.getData)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
deleted file mode 100644
index acf09ac..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import java.io.{InputStream, OutputStream}
-import java.net.ConnectException
-import scala.util.{Failure, Success, Try}
-
-import org.apache.hadoop.fs.Path
-
-import io.gearpump.util.LogUtil
-
-class FileSystem(yarnConfig: YarnConfig) {
-
- private val conf = yarnConfig.conf
- private val fs = org.apache.hadoop.fs.FileSystem.get(conf)
-
- private def LOG = LogUtil.getLogger(getClass)
-
- def open(file: String): InputStream = exceptionHandler {
- val path = new Path(file)
- fs.open(path)
- }
-
- def create(file: String): OutputStream = exceptionHandler {
- val path = new Path(file)
- fs.create(path)
- }
-
- def exists(file: String): Boolean = exceptionHandler {
- val path = new Path(file)
- fs.exists(path)
- }
-
- def name: String = {
- fs.getUri.toString
- }
-
- def getHomeDirectory: String = {
- fs.getHomeDirectory.toString
- }
-
- private def exceptionHandler[T](call: => T): T = {
- val callTry = Try(call)
- callTry match {
- case Success(v) => v
- case Failure(ex) =>
- if (ex.isInstanceOf[ConnectException]) {
- LOG.error("Please check whether we connect to the right HDFS file system, " +
- "current file system is $name." + "\n. Please copy all configs under " +
- "$HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, " +
- "so that we can use the right File system.", ex)
- }
- throw ex
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
deleted file mode 100644
index 3e7e668..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import java.nio.ByteBuffer
-
-import akka.actor.ActorRef
-import com.typesafe.config.Config
-import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource}
-import org.apache.hadoop.yarn.client.api.async.NMClientAsync
-import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
-
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
-/**
- * Adapter for node manager client
- */
-class NMClient(yarnConf: YarnConfig, config: Config) extends NMClientAsync.CallbackHandler {
-
- private val LOG = LogUtil.getLogger(getClass)
-
- private var reportTo: ActorRef = null
- private var client: NMClientAsyncImpl = null
-
- def start(reportTo: ActorRef): Unit = {
- LOG.info("Starting Node Manager Client NMClient...")
- this.reportTo = reportTo
- client = new NMClientAsyncImpl(this)
- client.init(yarnConf.conf)
- client.start()
- }
-
- private[glue]
- override def onContainerStarted(
- containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) {
- LOG.info(s"Container started : $containerId, " + allServiceResponse)
- reportTo ! ContainerStarted(containerId)
- }
-
- private[glue]
- override def onContainerStatusReceived(
- containerId: YarnContainerId, containerStatus: YarnContainerStatus) {
- LOG.info(s"Container status received : $containerId, status $containerStatus")
- }
-
- private[glue]
- override def onContainerStopped(containerId: YarnContainerId) {
- LOG.error(s"Container stopped : $containerId")
- }
-
- private[glue]
- override def onGetContainerStatusError(containerId: YarnContainerId, throwable: Throwable) {
- LOG.error(s"Container exception : $containerId", throwable)
- }
-
- private[glue]
- override def onStartContainerError(containerId: YarnContainerId, throwable: Throwable) {
- LOG.error(s"Container exception : $containerId", throwable)
- }
-
- private[glue]
- override def onStopContainerError(containerId: YarnContainerId, throwable: Throwable) {
- LOG.error(s"Container exception : $containerId", throwable)
- }
-
- def launchCommand(
- container: Container, command: String, packagePath: String, configPath: String): Unit = {
- LOG.info(s"Launching command : $command on container" +
- s": ${container.getId}, host ip : ${container.getNodeId.getHost}")
- val context = ContainerLaunchContext(yarnConf.conf, command, packagePath, configPath)
- client.startContainerAsync(container, context)
- }
-
- def stopContainer(containerId: ContainerId, nodeId: NodeId): Unit = {
- LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} ")
- client.stopContainerAsync(containerId, nodeId)
- }
-
- def stop(): Unit = {
- LOG.info(s"Shutdown NMClient")
- client.stop()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
deleted file mode 100644
index 7b9d83c..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import scala.collection.JavaConverters._
-
-import akka.actor.ActorRef
-import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => YarnResource}
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
-import org.slf4j.Logger
-
-import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication}
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
-
-/**
- * Adapter for resource manager client
- */
-class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- private var reportTo: ActorRef = null
- private var client: AMRMClientAsync[ContainerRequest] = null
-
- def start(reportTo: ActorRef): Unit = {
- LOG.info("Starting Resource Manager Client RMClient...")
- this.reportTo = reportTo
- client = startAMRMClient
- }
-
- private def startAMRMClient: AMRMClientAsync[ContainerRequest] = {
- val timeIntervalMs = 1000 // ms
- val amrmClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](timeIntervalMs, this)
- amrmClient.init(yarnConf.conf)
- amrmClient.start()
- amrmClient
- }
-
- override def getProgress: Float = 0.5F
-
- private var allocatedContainers = Set.empty[YarnContainerId]
-
- private[glue]
- override def onContainersAllocated(containers: java.util.List[YarnContainer]) {
- val newContainers = containers.asScala.toList.filterNot(container =>
- allocatedContainers.contains(container.getId))
- allocatedContainers ++= newContainers.map(_.getId)
- LOG.info(s"New allocated ${newContainers.size} containers")
- reportTo ! ContainersAllocated(newContainers.map(yarnContainerToContainer(_)))
- }
-
- private[glue]
- override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus])
- : Unit = {
- LOG.info(s"Got response from RM. Completed containers=${completedContainers.size()}")
- reportTo ! ContainersCompleted(
- completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_)))
- }
-
- private[glue]
- override def onError(ex: Throwable) {
- LOG.info("Error occurred")
- reportTo ! ResourceManagerException(ex)
- }
-
- private[glue]
- override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = {
- LOG.info("onNodesUpdates")
- }
-
- private[glue]
- override def onShutdownRequest() {
- LOG.info("Shutdown requested")
- reportTo ! ShutdownApplication
- }
-
- def requestContainers(containers: List[Resource]): Unit = {
- LOG.info(s"request Resource, slots: ${containers.length}, ${containers.mkString("\n")}")
- containers.foreach(resource => {
- client.addContainerRequest(createContainerRequest(resource))
- })
- }
-
- private def createContainerRequest(capability: Resource): ContainerRequest = {
- LOG.info("creating ContainerRequest")
- val priorityRecord = org.apache.hadoop.yarn.util.Records.newRecord(classOf[Priority])
- priorityRecord.setPriority(0)
- val priority = Priority.newInstance(0)
- new ContainerRequest(capability, null, null, priority)
- }
-
- def registerAppMaster(host: String, port: Int, url: String): Unit = {
- LOG.info(s"Received RegisterAMMessage! $host:$port $url")
- val response = client.registerApplicationMaster(host, port, url)
- LOG.info("Received RegisterAppMasterResponse ")
- reportTo ! AppMasterRegistered
- }
-
- def shutdownApplication(): Unit = {
- LOG.info(s"Shutdown application")
- client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "success", null)
- client.stop()
- }
-
- def failApplication(ex: Throwable): Unit = {
- LOG.error(s"Application failed! ", ex)
- client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, ex.getMessage, null)
- client.stop()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
deleted file mode 100644
index 7b863fa..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import scala.language.implicitConversions
-
-import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, ApplicationSubmissionContext, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource, YarnApplicationState}
-import org.apache.hadoop.yarn.util.{Records => YarnRecords}
-
-object Records {
- def newRecord[T](clazz: Class[T]): T = YarnRecords.newRecord(clazz)
-
- def newAppSubmissionContext: ApplicationSubmissionContext = {
- YarnRecords.newRecord(classOf[ApplicationSubmissionContext])
- }
-
- class ApplicationId(private[glue] val impl: YarnApplicationId) {
- def getId: Int = impl.getId
-
- override def toString: String = impl.toString
-
- override def equals(other: Any): Boolean = {
- if (other.isInstanceOf[ApplicationId]) {
- impl.equals(other.asInstanceOf[ApplicationId].impl)
- } else {
- false
- }
- }
-
- override def hashCode(): Int = {
- impl.hashCode()
- }
- }
-
- object ApplicationId {
- def newInstance(timestamp: Long, id: Int): ApplicationId = {
- YarnApplicationId.newInstance(timestamp, id)
- }
- }
-
- class ApplicationReport(private[glue] val impl: YarnApplicationReport) {
- def getApplicationId: ApplicationId = impl.getApplicationId
-
- def getDiagnostics: String = impl.getDiagnostics
-
- def getFinishTime: Long = impl.getFinishTime
-
- def getOriginalTrackingUrl: String = impl.getOriginalTrackingUrl
-
- def getYarnApplicationState: YarnApplicationState = impl.getYarnApplicationState
-
- override def toString: String = impl.toString
- }
- class Resource(private[glue] val impl: YarnResource) {
- override def toString: String = impl.toString
-
- override def equals(other: Any): Boolean = {
- if (other.isInstanceOf[Resource]) {
- impl.equals(other.asInstanceOf[Resource].impl)
- } else {
- false
- }
- }
-
- override def hashCode(): Int = {
- impl.hashCode()
- }
- }
-
- object Resource {
- def newInstance(memory: Int, vCores: Int): Resource = {
- YarnResource.newInstance(memory, vCores)
- }
- }
-
- class Container(private[glue] val impl: YarnContainer) {
- def getId: ContainerId = impl.getId
-
- def getNodeHttpAddress: String = impl.getNodeHttpAddress
-
- def getNodeId: NodeId = impl.getNodeId
-
- override def toString: String = impl.toString
-
- override def equals(other: Any): Boolean = {
- if (other.isInstanceOf[Container]) {
- impl.equals(other.asInstanceOf[Container].impl)
- } else {
- false
- }
- }
-
- override def hashCode(): Int = {
- impl.hashCode()
- }
- }
-
- class ContainerId(private[glue] val impl: YarnContainerId) {
- override def toString: String = impl.toString
-
- override def equals(other: Any): Boolean = {
- if (other.isInstanceOf[ContainerId]) {
- impl.equals(other.asInstanceOf[ContainerId].impl)
- } else {
- false
- }
- }
-
- override def hashCode(): Int = {
- impl.hashCode()
- }
- }
-
- object ContainerId {
- def fromString(worker: String): ContainerId = YarnContainerId.fromString(worker)
- }
-
- class NodeId(private[glue] val impl: YarnNodeId) {
- def getHost: String = impl.getHost
-
- override def toString: String = impl.toString
-
- override def equals(other: Any): Boolean = {
- if (other.isInstanceOf[NodeId]) {
- impl.equals(other.asInstanceOf[NodeId].impl)
- } else {
- false
- }
- }
-
- override def hashCode(): Int = {
- impl.hashCode()
- }
- }
-
- class ContainerStatus(private[glue] val impl: YarnContainerStatus) {
- def getDiagnostics: String = impl.getDiagnostics
-
- def getContainerId: ContainerId = impl.getContainerId
-
- def getExitStatus: Int = impl.getExitStatus
-
- override def toString: String = impl.toString
- }
-
- private[glue] implicit def yarnResourceToResource(res: YarnResource): Resource = {
- new Resource(res)
- }
-
- private[glue] implicit def resourceToYarnResource(res: Resource): YarnResource = {
- res.impl
- }
-
- private[glue] implicit def yarnAppIdToAppId(yarn: YarnApplicationId): ApplicationId = {
- new ApplicationId(yarn)
- }
-
- private[glue] implicit def appIdToYarnAppId(app: ApplicationId): YarnApplicationId = {
- app.impl
- }
-
- private[glue] implicit def yarnReportToReport(yarn: YarnApplicationReport): ApplicationReport = {
- new ApplicationReport(yarn)
- }
-
- private[glue] implicit def reportToYarnReport(app: ApplicationReport): YarnApplicationReport = {
- app.impl
- }
-
- private[glue] implicit def yarnContainerToContainer(yarn: YarnContainer): Container = {
- new Container(yarn)
- }
-
- private[glue] implicit def containerToYarnContainer(app: Container): YarnContainer = {
- app.impl
- }
-
- private[glue] implicit def yarnContainerStatusToContainerStatus(yarn: YarnContainerStatus)
- : ContainerStatus = {
- new ContainerStatus(yarn)
- }
-
- private[glue] implicit def containerStatusToYarnContainerStatus(app: ContainerStatus)
- : YarnContainerStatus = {
- app.impl
- }
-
- private[glue] implicit def containerIdToYarnContainerId(app: ContainerId): YarnContainerId = {
- app.impl
- }
-
- private[glue] implicit def yarnContainerIdToContainerId(yarn: YarnContainerId): ContainerId = {
- new ContainerId(yarn)
- }
-
- private[glue] implicit def nodeIdToYarnNodeId(app: NodeId): YarnNodeId = {
- app.impl
- }
-
- private[glue] implicit def yarnNodeIdToNodeId(yarn: YarnNodeId): NodeId = {
- new NodeId(yarn)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
deleted file mode 100644
index db7d5d7..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import org.apache.hadoop.yarn.api.records.YarnApplicationState
-import org.apache.hadoop.yarn.client.api
-
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
-
-/**
- * Adapter for api.YarnClient
- */
-class YarnClient(yarn: YarnConfig) {
-
- val LOG = LogUtil.getLogger(getClass)
-
- private val client: api.YarnClient = api.YarnClient.createYarnClient
- client.init(yarn.conf)
- client.start()
- LOG.info("Starting YarnClient...")
-
- def createApplication: ApplicationId = {
- val app = client.createApplication()
- val response = app.getNewApplicationResponse()
- LOG.info("Create application, appId: " + response.getApplicationId())
- response.getApplicationId()
- }
-
- def getApplicationReport(appId: ApplicationId): ApplicationReport = {
- client.getApplicationReport(appId)
- }
-
- def submit(
- name: String, appId: ApplicationId, command: String, resource: Resource, queue: String,
- packagePath: String, configPath: String): ApplicationId = {
-
- val appContext = Records.newAppSubmissionContext
- appContext.setApplicationName(name)
- appContext.setApplicationId(appId)
-
- val containerContext = ContainerLaunchContext(yarn.conf, command, packagePath, configPath)
- appContext.setAMContainerSpec(containerContext)
- appContext.setResource(resource)
- appContext.setQueue(queue)
-
- LOG.info(s"Submit Application $appId to YARN...")
- client.submitApplication(appContext)
- }
-
- def awaitApplication(appId: ApplicationId, timeoutMilliseconds: Long = Long.MaxValue)
- : ApplicationReport = {
- import org.apache.hadoop.yarn.api.records.YarnApplicationState._
- val terminated = Set(FINISHED, KILLED, FAILED, RUNNING)
- var result: ApplicationReport = null
- var done = false
-
- val start = System.currentTimeMillis()
- def timeout: Boolean = {
- val now = System.currentTimeMillis()
- if (now - start > timeoutMilliseconds) {
- true
- } else {
- false
- }
- }
-
- while (!done && !timeout) {
- val report = client.getApplicationReport(appId)
- val status = report.getYarnApplicationState
- if (terminated.contains(status)) {
- done = true
- result = report
- } else {
- Console.print(".")
- Thread.sleep(1000)
- }
- }
-
- if (timeout) {
- throw new Exception(s"Launch Application $appId timeout...")
- }
- result
- }
-
- def stop(): Unit = {
- client.stop()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
deleted file mode 100644
index 87f199a..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn.glue
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-
-/**
- * wrapper for yarn configuration
- */
-case class YarnConfig(conf: YarnConfiguration = new YarnConfiguration(new Configuration(true))) {
- def writeXml(out: java.io.Writer): Unit = conf.writeXml(out)
-
- def resourceManager: String = {
- conf.get("yarn.resourcemanager.hostname")
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala
deleted file mode 100644
index 24adbaa..0000000
--- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.yarn
-
-
-/**
- * YARN facade to decouple Gearpump with YARN.
- */
-package object glue {
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
new file mode 100644
index 0000000..33c3e97
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn
+
+object Constants {
+ val APPMASTER_NAME = "gearpump.yarn.applicationmaster.name"
+ val APPMASTER_COMMAND = "gearpump.yarn.applicationmaster.command"
+ val APPMASTER_MEMORY = "gearpump.yarn.applicationmaster.memory"
+ val APPMASTER_VCORES = "gearpump.yarn.applicationmaster.vcores"
+ val APPMASTER_QUEUE = "gearpump.yarn.applicationmaster.queue"
+
+ val PACKAGE_PATH = "gearpump.yarn.client.package-path"
+ val CONFIG_PATH = "gearpump.yarn.client.config-path"
+
+ val MASTER_COMMAND = "gearpump.yarn.master.command"
+ val MASTER_MEMORY = "gearpump.yarn.master.memory"
+ val MASTER_VCORES = "gearpump.yarn.master.vcores"
+
+ val WORKER_COMMAND = "gearpump.yarn.worker.command"
+ val WORKER_CONTAINERS = "gearpump.yarn.worker.containers"
+ val WORKER_MEMORY = "gearpump.yarn.worker.memory"
+ val WORKER_VCORES = "gearpump.yarn.worker.vcores"
+
+ val SERVICES_ENABLED = "gearpump.yarn.services.enabled"
+
+ val LOCAL_DIRS = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.LOCAL_DIRS.$$()
+ val CONTAINER_ID = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CONTAINER_ID.$$()
+ val LOG_DIR_EXPANSION_VAR = org.apache.hadoop.yarn.api.ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ val NODEMANAGER_HOST = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_HOST.$$()
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
new file mode 100644
index 0000000..711506a
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.cluster.main.{Master, Worker}
+import org.apache.gearpump.experiments.yarn.Constants._
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants
+
+/** Command to start a YARN container */
+trait Command {
+ def get: String
+ override def toString: String = get
+}
+
+abstract class AbstractCommand extends Command {
+ protected def config: Config
+ def version: String
+ def classPath: Array[String] = {
+ Array(
+ s"conf",
+ s"pack/$version/conf",
+ s"pack/$version/lib/daemon/*",
+ s"pack/$version/lib/*"
+ )
+ }
+
+ protected def buildCommand(
+ java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String])
+ : String = {
+ val exe = config.getString(java)
+
+ s"$exe -cp ${classPath.mkString(":")}:" +
+ "$CLASSPATH " + properties.mkString(" ") +
+ s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr"
+ }
+
+ protected def clazz(any: AnyRef): String = {
+ val name = any.getClass.getName
+ if (name.endsWith("$")) {
+ name.dropRight(1)
+ } else {
+ name
+ }
+ }
+}
+
+case class MasterCommand(config: Config, version: String, masterAddr: HostPort)
+ extends AbstractCommand {
+
+ def get: String = {
+ val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}")
+
+ val properties = Array(
+ s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
+ s"-D${Constants.GEARPUMP_HOSTNAME}=${masterAddr.host}",
+ s"-D${Constants.GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}",
+ s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
+ s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
+ s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}")
+
+ buildCommand(MASTER_COMMAND, properties, clazz(Master), masterArguments)
+ }
+}
+
+case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String)
+ extends AbstractCommand {
+
+ def get: String = {
+ val properties = Array(
+ s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}",
+ s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
+ s"-D${Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}",
+ s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
+ s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
+ s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost")
+
+ buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String])
+ }
+}
+
+case class AppMasterCommand(config: Config, version: String, args: Array[String])
+ extends AbstractCommand {
+
+ override val classPath = Array(
+ "conf",
+ s"pack/$version/conf",
+ s"pack/$version/dashboard",
+ s"pack/$version/lib/*",
+ s"pack/$version/lib/daemon/*",
+ s"pack/$version/lib/services/*",
+ s"pack/$version/lib/yarn/*"
+ )
+
+ def get: String = {
+ val properties = Array(
+ s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version",
+ s"-D${Constants.GEARPUMP_FULL_SCALA_VERSION}=$version",
+ s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}",
+ s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}",
+ s"-D${Constants.GEARPUMP_HOSTNAME}=${NODEMANAGER_HOST}")
+
+ val arguments = Array(s"") ++ args
+
+ buildCommand(APPMASTER_COMMAND, properties, clazz(YarnAppMaster),
+ arguments)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala
new file mode 100644
index 0000000..1194f0b
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import akka.actor._
+import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.services.main.Services
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil}
+
+import scala.concurrent.Future
+
+trait UIFactory {
+ def props(masters: List[HostPort], host: String, port: Int): Props
+}
+
+/** Wrapper of UI server */
+class UIService(masters: List[HostPort], host: String, port: Int) extends Actor {
+ private val LOG = LogUtil.getLogger(getClass)
+
+ private val supervisor = ActorUtil.getFullPath(context.system, context.parent.path)
+ private var configFile: java.io.File = null
+
+ private implicit val dispatcher = context.dispatcher
+
+ override def postStop(): Unit = {
+ if (configFile != null) {
+ configFile.delete()
+ configFile = null
+ }
+
+ // TODO: fix this
+ // Hack around to Kill the UI server
+ Services.kill()
+ }
+
+ override def preStart(): Unit = {
+ Future(start())
+ }
+
+ def start(): Unit = {
+ val mastersArg = masters.mkString(",")
+ LOG.info(s"Launching services -master $mastersArg")
+
+ configFile = java.io.File.createTempFile("uiserver", ".conf")
+
+ val config = context.system.settings.config.
+ withValue(Constants.GEARPUMP_SERVICE_HOST, ConfigValueFactory.fromAnyRef(host)).
+ withValue(Constants.GEARPUMP_SERVICE_HTTP, ConfigValueFactory.fromAnyRef(port.toString)).
+ withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(host))
+
+ ClusterConfig.saveConfig(config, configFile)
+
+ val master = masters.head
+
+ ConfigFactory.invalidateCaches()
+ launch(supervisor, master.host, master.port, configFile.toString)
+ }
+
+ // Launch the UI server
+ def launch(supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = {
+ Services.main(Array("-supervisor", supervisor, "-master", s"$masterHost:$masterPort"
+ , "-conf", configFile))
+ }
+
+ override def receive: Receive = {
+ case _ =>
+ LOG.error(s"Unknown message received")
+ }
+}
+
+object UIService extends UIFactory {
+ override def props(masters: List[HostPort], host: String, port: Int): Props = {
+ Props(new UIService(masters, host, port))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
new file mode 100644
index 0000000..97577fb
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.appmaster
+
+import java.io.IOException
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.util.Timeout
+import com.typesafe.config.ConfigValueFactory
+import org.apache.commons.httpclient.HttpClient
+import org.apache.commons.httpclient.methods.GetMethod
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.experiments.yarn.Constants._
+import org.apache.gearpump.experiments.yarn.glue.Records._
+import org.apache.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util._
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+/**
+ * YARN AppMaster. YARN AppMaster is responsible to start Gearpump masters, workers, UI server as
+ * YARN containers.
+ *
+ * NOTE: It is different with Gearpump AppMaster. Gearpump AppMaster is a sub-process of worker.
+ */
+class YarnAppMaster(rmClient: RMClient, nmClient: NMClient,
+ packagePath: String, hdfsConfDir: String,
+ uiFactory: UIFactory)
+ extends Actor {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private val akkaConf = context.system.settings.config
+ private val servicesEnabled = akkaConf.getString(SERVICES_ENABLED).toBoolean
+ private var uiStarted = false
+ private val host = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
+
+ private val port = Util.findFreePort().get
+
+ private val trackingURL = "http://" + host + ":" + port
+
+ // TODO: for now, only one master is supported.
+ private val masterCount = 1
+ private val masterMemory = akkaConf.getString(MASTER_MEMORY).toInt
+ private val masterVCores = akkaConf.getString(MASTER_VCORES).toInt
+
+ private var workerCount = akkaConf.getString(WORKER_CONTAINERS).toInt
+ private val workerMemory = akkaConf.getString(WORKER_MEMORY).toInt
+ private val workerVCores = akkaConf.getString(WORKER_VCORES).toInt
+
+ val rootPath = System.getProperty(Constants.GEARPUMP_FULL_SCALA_VERSION)
+
+ rmClient.start(self)
+ nmClient.start(self)
+
+ def receive: Receive = null
+
+ private def registerAppMaster(): Unit = {
+ val target = host + ":" + port
+ rmClient.registerAppMaster(host, port, trackingURL)
+ }
+
+ registerAppMaster
+ context.become(waitForAppMasterRegistered)
+
+ import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster._
+
+ def waitForAppMasterRegistered: Receive = {
+ case AppMasterRegistered =>
+ LOG.info("YarnAppMaster registration completed")
+ requestMasterContainers(masterCount)
+ context.become(startingMasters(remain = masterCount, List.empty[MasterInfo]))
+ }
+
+ private def startingMasters(remain: Int, masters: List[MasterInfo]): Receive = box {
+ case ContainersAllocated(containers) =>
+ LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: "
+ + containers.size)
+ val count = Math.min(containers.length, remain)
+ val newMasters = (0 until count).toList.map { index =>
+ val container = containers(index)
+ MasterInfo(container.getId, container.getNodeId, launchMaster(container))
+ }
+
+ // Stops un-used containers
+ containers.drop(count).map { container =>
+ nmClient.stopContainer(container.getId, container.getNodeId)
+ }
+
+ context.become(startingMasters(remain, newMasters ++ masters))
+ case ContainerStarted(containerId) =>
+ LOG.info(s"ContainerStarted: container ${containerId} started for master(remain=$remain) ")
+ if (remain > 1) {
+ context.become(startingMasters(remain - 1, masters))
+ } else {
+ requestWorkerContainers(workerCount)
+ context.become(startingWorkers(workerCount, masters, List.empty[WorkerInfo]))
+ }
+ }
+
+ private def box(receive: Receive): Receive = {
+ onError orElse receive orElse unHandled
+ }
+
+ private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo])
+ : Receive = {
+ box {
+ case ContainersAllocated(containers) =>
+ LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), " +
+ s"count: " + containers.size)
+
+ val count = Math.min(containers.length, remain)
+ val newWorkers = (0 until count).toList.map { index =>
+ val container = containers(index)
+ launchWorker(container, masters)
+ WorkerInfo(container.getId, container.getNodeId)
+ }
+
+ // Stops un-used containers
+ containers.drop(count).map { container =>
+ nmClient.stopContainer(container.getId, container.getNodeId)
+ }
+ context.become(startingWorkers(remain, masters, workers ++ newWorkers))
+ case ContainerStarted(containerId) =>
+ LOG.info(s"ContainerStarted: container $containerId started for worker(remain=$remain)")
+ // The last one
+ if (remain > 1) {
+ context.become(startingWorkers(remain - 1, masters, workers))
+ } else {
+ if (servicesEnabled && !uiStarted) {
+ context.actorOf(uiFactory.props(masters.map(_.host), host, port))
+ uiStarted = true
+ }
+ context.become(service(effectiveConfig(masters.map(_.host)), masters, workers))
+ }
+ }
+ }
+
+ private def effectiveConfig(masters: List[HostPort]): Config = {
+ val masterList = masters.map(pair => s"${pair.host}:${pair.port}")
+ val config = context.system.settings.config
+ config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
+ ConfigValueFactory.fromIterable(masterList.asJava))
+ }
+
+ private def onError: Receive = {
+ case ContainersCompleted(containers) =>
+ // TODO: we should recover the failed container from this...
+ containers.foreach { status =>
+ if (status.getExitStatus != 0) {
+ LOG.error(s"ContainersCompleted: container ${status.getContainerId}" +
+ s" failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}")
+ } else {
+ LOG.info(s"ContainersCompleted: container ${status.getContainerId} completed")
+ }
+ }
+ case ShutdownApplication =>
+ LOG.error("ShutdownApplication")
+ nmClient.stop()
+ rmClient.shutdownApplication()
+ context.stop(self)
+ case ResourceManagerException(ex) =>
+ LOG.error("ResourceManagerException: " + ex.getMessage, ex)
+ nmClient.stop()
+ rmClient.failApplication(ex)
+ context.stop(self)
+ case Kill =>
+ LOG.info("Kill: User asked to shutdown the application")
+ sender ! CommandResult(success = true)
+ self ! ShutdownApplication
+ }
+
+ private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo])
+ : Receive = box {
+ case GetActiveConfig(clientHost) =>
+ LOG.info("GetActiveConfig: Get active configuration for client: " + clientHost)
+ val filtered = ClusterConfig.filterOutDefaultConfig(
+ config.withValue(Constants.GEARPUMP_HOSTNAME,
+ ConfigValueFactory.fromAnyRef(clientHost)))
+ sender ! ActiveConfig(filtered)
+ case QueryVersion =>
+ LOG.info("QueryVersion")
+ sender ! Version(Util.version)
+ case QueryClusterInfo =>
+ LOG.info("QueryClusterInfo")
+ val masterContainers = masters.map { master =>
+ master.id.toString + s"(${master.nodeId.toString})"
+ }
+
+ val workerContainers = workers.map { worker =>
+ worker.id.toString + s"(${worker.nodeId.toString})"
+ }
+ sender ! ClusterInfo(masterContainers, workerContainers)
+ case AddMaster =>
+ sender ! CommandResult(success = false, "Not Implemented")
+ case RemoveMaster(masterId) =>
+ sender ! CommandResult(success = false, "Not Implemented")
+ case AddWorker(count) =>
+ if (count == 0) {
+ sender ! CommandResult(success = true)
+ } else {
+ LOG.info("AddWorker: Start to add new workers, count: " + count)
+ workerCount += count
+ requestWorkerContainers(count)
+ context.become(startingWorkers(count, masters, workers))
+ sender ! CommandResult(success = true)
+ }
+ case RemoveWorker(worker) =>
+ val workerId = ContainerId.fromString(worker)
+ LOG.info(s"RemoveWorker: remove worker $workerId")
+ val info = workers.find(_.id.toString == workerId.toString)
+ if (info.isDefined) {
+ nmClient.stopContainer(info.get.id, info.get.nodeId)
+ sender ! CommandResult(success = true)
+ val remainWorkers = workers.filter(_.id != info.get.id)
+ context.become(service(config, masters, remainWorkers))
+ } else {
+ sender ! CommandResult(success = false, "failed to find worker " + worker)
+ }
+ }
+
+ private def unHandled: Receive = {
+ case other =>
+ LOG.info(s"Received unknown message $other")
+ }
+
+ private def requestMasterContainers(masters: Int) = {
+ LOG.info(s"Request resource for masters($masters)")
+ val containers = (1 to masters).map(
+ i => Resource.newInstance(masterMemory, masterVCores)
+ ).toList
+ rmClient.requestContainers(containers)
+ }
+
+ private def launchMaster(container: Container): HostPort = {
+ LOG.info(s"Launch Master on container " + container.getNodeHttpAddress)
+ val host = container.getNodeId.getHost
+
+ val port = Util.findFreePort().get
+
+ LOG.info("=============PORT" + port)
+ val masterCommand = MasterCommand(akkaConf, rootPath, HostPort(host, port))
+ nmClient.launchCommand(container, masterCommand.get, packagePath, hdfsConfDir)
+ HostPort(host, port)
+ }
+
+ private def requestWorkerContainers(workers: Int): Unit = {
+ LOG.info(s"Request resource for workers($workers)")
+ val containers = (1 to workers).map(
+ i => Resource.newInstance(workerMemory, workerVCores)
+ ).toList
+
+ rmClient.requestContainers(containers)
+ }
+
+ private def launchWorker(container: Container, masters: List[MasterInfo]): Unit = {
+ LOG.info(s"Launch Worker on container " + container.getNodeHttpAddress)
+ val workerHost = container.getNodeId.getHost
+ val workerCommand = WorkerCommand(akkaConf, rootPath, masters.head.host, workerHost)
+ nmClient.launchCommand(container, workerCommand.get, packagePath, hdfsConfDir)
+ }
+}
+
+object YarnAppMaster extends AkkaApp with ArgumentsParser {
+ val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "conf" -> CLIOption[String]("<Gearpump configuration directory on HDFS>", required = true),
+ "package" -> CLIOption[String]("<Gearpump package path on HDFS>", required = true)
+ )
+
+ override def akkaConfig: Config = {
+ ClusterConfig.ui()
+ }
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+ implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+ implicit val system = ActorSystem("GearpumpAM", akkaConf)
+
+ val yarnConf = new YarnConfig()
+
+ val confDir = parse(args).getString("conf")
+ val packagePath = parse(args).getString("package")
+
+ LOG.info("HADOOP_CONF_DIR: " + System.getenv("HADOOP_CONF_DIR"))
+ LOG.info("YARN Resource Manager: " + yarnConf.resourceManager)
+
+ val rmClient = new RMClient(yarnConf)
+ val nmClient = new NMClient(yarnConf, akkaConf)
+ val appMaster = system.actorOf(Props(new YarnAppMaster(rmClient,
+ nmClient, packagePath, confDir, UIService)))
+
+ val daemon = system.actorOf(Props(new Daemon(appMaster)))
+ Await.result(system.whenTerminated, Duration.Inf)
+ LOG.info("YarnAppMaster is shutdown")
+ }
+
+ class Daemon(appMaster: ActorRef) extends Actor {
+ context.watch(appMaster)
+
+ override def receive: Actor.Receive = {
+ case Terminated(actor) =>
+ if (actor.compareTo(appMaster) == 0) {
+ LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, " +
+ s"shutting down current ActorSystem")
+ context.system.terminate()
+ context.stop(self)
+ }
+ }
+ }
+
+ case class ResourceManagerException(throwable: Throwable)
+ case object ShutdownApplication
+ case class ContainersRequest(containers: List[Resource])
+ case class ContainersAllocated(containers: List[Container])
+ case class ContainersCompleted(containers: List[ContainerStatus])
+ case class ContainerStarted(containerId: ContainerId)
+ case object AppMasterRegistered
+
+ case class GetActiveConfig(clientHost: String)
+
+ case object QueryClusterInfo
+ case class ClusterInfo(masters: List[String], workers: List[String]) {
+ override def toString: String = {
+ val separator = "\n"
+ val masterSection = "masters: " + separator + masters.mkString("\n") + "\n"
+
+ val workerSection = "workers: " + separator + workers.mkString("\n") + "\n"
+ masterSection + workerSection
+ }
+ }
+
+ case object Kill
+ case class ActiveConfig(config: Config)
+
+ case object QueryVersion
+
+ case class Version(version: String)
+
+ case class MasterInfo(id: ContainerId, nodeId: NodeId, host: HostPort)
+
+ case class WorkerInfo(id: ContainerId, nodeId: NodeId)
+
+ def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = {
+ val client = new HttpClient()
+ val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path"
+ val get = new GetMethod(appMasterPath)
+ var status = client.executeMethod(get)
+
+ if (status != 200) {
+ // Sleeps a little bit, and try again
+ Thread.sleep(3000)
+ status = client.executeMethod(get)
+ }
+
+ if (status == 200) {
+ AkkaHelper.actorFor(system, get.getResponseBodyAsString)
+ } else {
+ throw new IOException("Fail to resolve AppMaster address, please make sure " +
+ s"${report.getOriginalTrackingUrl} is accessible...")
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
new file mode 100644
index 0000000..9fb69b2
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.client
+
+import java.io.IOException
+
+import akka.actor.{ActorRef, ActorSystem}
+import org.apache.commons.httpclient.HttpClient
+import org.apache.commons.httpclient.methods.GetMethod
+import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId
+import org.apache.gearpump.experiments.yarn.glue.YarnClient
+import org.apache.gearpump.util.{AkkaHelper, LogUtil}
+
+import scala.util.Try
+
+/**
+ * Resolves AppMaster ActorRef
+ */
+class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) {
+ val LOG = LogUtil.getLogger(getClass)
+ val RETRY_INTERVAL_MS = 3000 // ms
+
+ def resolve(appId: ApplicationId, timeoutSeconds: Int = 30): ActorRef = {
+ val appMaster = retry(connect(appId), 1 + timeoutSeconds * 1000 / RETRY_INTERVAL_MS)
+ appMaster
+ }
+
+ private def connect(appId: ApplicationId): ActorRef = {
+ val report = yarnClient.getApplicationReport(appId)
+ val client = new HttpClient()
+ val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path"
+ LOG.info(s"appMasterPath=$appMasterPath")
+ val get = new GetMethod(appMasterPath)
+ val status = client.executeMethod(get)
+ if (status == 200) {
+ val response = get.getResponseBodyAsString
+ LOG.info("Successfully resolved AppMaster address: " + response)
+ AkkaHelper.actorFor(system, response)
+ } else {
+ throw new IOException("Fail to resolve AppMaster address, please make sure " +
+ s"${report.getOriginalTrackingUrl} is accessible...")
+ }
+ }
+
+ private def retry(fun: => ActorRef, times: Int): ActorRef = {
+ var index = 0
+ var result: ActorRef = null
+ while (index < times && result == null) {
+ Thread.sleep(RETRY_INTERVAL_MS)
+ index += 1
+ val tryConnect = Try(fun)
+ if (tryConnect.isFailure) {
+ LOG.error(s"Failed to connect YarnAppMaster(tried $index)... " +
+ tryConnect.failed.get.getMessage)
+ } else {
+ result = tryConnect.get
+ }
+ }
+ result
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala
new file mode 100644
index 0000000..9ec2eae
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.yarn.client
+
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+/** Command line tool to launch a Gearpump cluster on YARN, and also to manage Gearpump cluster */
+object Client {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ val LAUNCH = "launch"
+
+ val commands = Map(LAUNCH -> LaunchCluster) ++
+ ManageCluster.commands.map(key => (key, ManageCluster)).toMap
+
+ def usage(): Unit = {
+ val keys = commands.keys.toList.sorted
+ // scalastyle:off println
+ Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+ // scalastyle:on println
+ }
+
+ def main(args: Array[String]): Unit = {
+ if (args.length == 0) {
+ usage()
+ } else {
+ val key = args(0)
+ val command = commands.get(key)
+ command match {
+ case Some(command) =>
+ if (key == LAUNCH) {
+ val remainArgs = args.drop(1)
+ command.main(remainArgs)
+ } else {
+ val commandArg = Array("-" + ManageCluster.COMMAND, key)
+ val remainArgs = args.drop(1)
+ val updatedArgs = commandArg ++ args.drop(1)
+ command.main(updatedArgs)
+ }
+ case None =>
+ usage
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
new file mode 100644
index 0000000..2475728
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.experiments.yarn.client
+
+import java.io.{File, IOException, OutputStreamWriter}
+import java.net.InetAddress
+import java.util.zip.ZipInputStream
+
+import akka.actor.ActorSystem
+import com.typesafe.config.{Config, ConfigValueFactory}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.experiments.yarn.Constants
+import org.apache.gearpump.experiments.yarn.appmaster.AppMasterCommand
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig}
+import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource}
+import org.apache.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
+import org.apache.gearpump.util.ActorUtil.askActor
+import org.apache.gearpump.util.{AkkaApp, LogUtil, Util}
+import org.slf4j.Logger
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+/**
+ * Launch Gearpump on YARN
+ */
+class LaunchCluster(
+ akka: Config,
+ yarnConf: YarnConfig,
+ yarnClient: YarnClient,
+ fs: FileSystem,
+ actorSystem: ActorSystem,
+ appMasterResolver: AppMasterResolver,
+ version: String = Util.version) {
+
+ import org.apache.gearpump.experiments.yarn.Constants._
+ private implicit val dispatcher = actorSystem.dispatcher
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private val host = InetAddress.getLocalHost.getHostName
+ private val queue = akka.getString(APPMASTER_QUEUE)
+ private val memory = akka.getString(APPMASTER_MEMORY).toInt
+ private val vcore = akka.getString(APPMASTER_VCORES).toInt
+
+ def submit(appName: String, packagePath: String): ApplicationId = {
+ LOG.info("Starting AM")
+
+ // First step, check the version, to make sure local version matches remote version
+ if (!packagePath.endsWith(".zip")) {
+ throw new IOException(s"YarnClient only support .zip distribution package," +
+ s" now it is ${packagePath}. Please download the zip " +
+ "package from website or use sbt assembly packArchiveZip to build one.")
+ }
+
+ if (!fs.exists(packagePath)) {
+ throw new IOException(s"Cannot find package ${packagePath} on HDFS ${fs.name}. ")
+ }
+
+ val rootEntry = rootEntryPath(zip = packagePath)
+
+ if (!rootEntry.contains(version)) {
+ throw new IOException(s"Check version failed! Local gearpump binary" +
+ s" version $version doesn't match with remote path $packagePath")
+ }
+
+ val resource = Resource.newInstance(memory, vcore)
+ val appId = yarnClient.createApplication
+
+ // uploads the configs to HDFS home directory of current user.
+ val configPath = uploadConfigToHDFS(appId)
+
+ val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath",
+ s"-package $packagePath"))
+
+ yarnClient.submit(appName, appId, command.get, resource, queue, packagePath, configPath)
+
+ LOG.info("Waiting application to finish...")
+ val report = yarnClient.awaitApplication(appId, LaunchCluster.TIMEOUT_MILLISECONDS)
+ LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} " +
+ s"at ${report.getFinishTime}, info: ${report.getDiagnostics}")
+
+ // scalastyle:off println
+ Console.println("================================================")
+ Console.println("==Application Id: " + appId)
+ // scalastyle:on println
+ appId
+ }
+
+ def saveConfig(appId: ApplicationId, output: String): Future[File] = {
+ LOG.info(s"Trying to download active configuration to output path: " + output)
+ LOG.info(s"Resolving YarnAppMaster ActorRef for application " + appId)
+ val appMaster = appMasterResolver.resolve(appId)
+ LOG.info(s"appMaster=${appMaster.path} host=$host")
+ val future = askActor[ActiveConfig](appMaster, GetActiveConfig(host)).map(_.config)
+ future.map { config =>
+ val out = new File(output)
+ ClusterConfig.saveConfig(config, out)
+ out
+ }
+ }
+
+ private def uploadConfigToHDFS(appId: ApplicationId): String = {
+ // Uses personal home directory so that it will not conflict with other users
+ // conf path pattern: /user/<userid>/.gearpump_application_<timestamp>_<id>/conf
+ val confDir = s"${fs.getHomeDirectory}/.gearpump_${appId}/conf/"
+ LOG.info(s"Uploading configuration files to remote HDFS(under $confDir)...")
+
+ // Copies config from local to remote.
+ val remoteConfFile = s"$confDir/gear.conf"
+ var out = fs.create(remoteConfFile)
+ var writer = new OutputStreamWriter(out)
+
+ val cleanedConfig = ClusterConfig.filterOutDefaultConfig(akka)
+
+ writer.write(cleanedConfig.root().render())
+ writer.close()
+
+ // Saves yarn-site.xml to remote
+ val yarn_site_xml = s"$confDir/yarn-site.xml"
+ out = fs.create(yarn_site_xml)
+ writer = new OutputStreamWriter(out)
+ yarnConf.writeXml(writer)
+ writer.close()
+
+ // Saves log4j.properties to remote
+ val log4j_properties = s"$confDir/log4j.properties"
+ val log4j = LogUtil.loadConfiguration
+ out = fs.create(log4j_properties)
+ writer = new OutputStreamWriter(out)
+ log4j.store(writer, "gearpump on yarn")
+ writer.close()
+ confDir.toString
+ }
+
+ private def rootEntryPath(zip: String): String = {
+ val stream = new ZipInputStream(fs.open(zip))
+ val entry = stream.getNextEntry()
+ val name = entry.getName
+ name.substring(0, entry.getName.indexOf("/"))
+ }
+}
+
+object LaunchCluster extends AkkaApp with ArgumentsParser {
+
+ val PACKAGE = "package"
+ val NAME = "name"
+ val VERBOSE = "verbose"
+ val OUTPUT = "output"
+
+ override protected def akkaConfig: Config = {
+ ClusterConfig.default()
+ }
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. " +
+ "If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false),
+ NAME -> CLIOption[String]("<Application name showed in YARN>", required = false,
+ defaultValue = Some("Gearpump")),
+ VERBOSE -> CLIOption("<print verbose log on console>", required = false,
+ defaultValue = Some(false)),
+ OUTPUT -> CLIOption("<output path for configuration file>", required = false,
+ defaultValue = None)
+ )
+ private val TIMEOUT_MILLISECONDS = 30 * 1000
+
+ override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+ val parsed = parse(args)
+ if (parsed.getBoolean(VERBOSE)) {
+ LogUtil.verboseLogToConsole()
+ }
+
+ val yarnConfig = new YarnConfig()
+ val fs = new FileSystem(yarnConfig)
+ val yarnClient = new YarnClient(yarnConfig)
+ val akkaConf = updateConf(inputAkkaConf, parsed)
+ val actorSystem = ActorSystem("launchCluster", akkaConf)
+ val appMasterResolver = new AppMasterResolver(yarnClient, actorSystem)
+
+ val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs,
+ actorSystem, appMasterResolver)
+
+ val name = parsed.getString(NAME)
+ val appId = client.submit(name, akkaConf.getString(Constants.PACKAGE_PATH))
+
+ if (parsed.exists(OUTPUT)) {
+ import scala.concurrent.duration._
+ Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)),
+ TIMEOUT_MILLISECONDS.milliseconds)
+ }
+
+ yarnClient.stop()
+ actorSystem.terminate()
+ Await.result(actorSystem.whenTerminated, Duration.Inf)
+ }
+
+ private def updateConf(akka: Config, parsed: ParseResult): Config = {
+ if (parsed.exists(PACKAGE)) {
+ akka.withValue(Constants.PACKAGE_PATH,
+ ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE)))
+ } else {
+ akka
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala
----------------------------------------------------------------------
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala
new file mode 100644
index 0000000..5b12346
--- /dev/null
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.experiments.yarn.client
+
+import java.io.{File, IOException}
+import java.net.InetAddress
+import scala.concurrent.{Await, Future}
+
+import akka.actor.{ActorRef, ActorSystem}
+
+import org.apache.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version}
+import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId
+import org.apache.gearpump.experiments.yarn.glue.{YarnClient, YarnConfig}
+import org.apache.gearpump.util.ActorUtil.askActor
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Manage current Gearpump cluster on YARN */
+class ManageCluster(appId: ApplicationId, appMaster: ActorRef, system: ActorSystem) {
+ import org.apache.gearpump.experiments.yarn.client.ManageCluster._
+
+ private val host = InetAddress.getLocalHost.getHostName
+ implicit val dispatcher = system.dispatcher
+
+ def getConfig: Future[ActiveConfig] = askActor[ActiveConfig](appMaster, GetActiveConfig(host))
+ def version: Future[Version] = askActor[Version](appMaster, QueryVersion)
+ def addWorker(count: Int): Future[CommandResult] = {
+ askActor[CommandResult](appMaster, AddWorker(count))
+ }
+
+ def removeWorker(worker: String): Future[CommandResult] = {
+ askActor[CommandResult](appMaster, RemoveWorker(worker))
+ }
+
+ def shutdown: Future[CommandResult] = askActor[CommandResult](appMaster, Kill)
+ def queryClusterInfo: Future[ClusterInfo] = askActor[ClusterInfo](appMaster, QueryClusterInfo)
+
+ def command(command: String, parsed: ParseResult): Future[AnyRef] = {
+ command match {
+ case GET_CONFIG =>
+ if (parsed.exists(OUTPUT)) {
+ getConfig.map { conf =>
+ ClusterConfig.saveConfig(conf.config, new File(parsed.getString(OUTPUT)))
+ conf
+ }
+ } else {
+ throw new IOException(s"Please specify -$OUTPUT option")
+ }
+ case ADD_WORKER =>
+ val count = parsed.getString(COUNT).toInt
+ addWorker(count)
+ case REMOVE_WORKER =>
+ val containerId = parsed.getString(CONTAINER)
+ if (containerId == null || containerId.isEmpty) {
+ throw new IOException(s"Please specify -$CONTAINER option")
+ } else {
+ removeWorker(containerId)
+ }
+ case KILL =>
+ shutdown
+ case QUERY =>
+ queryClusterInfo
+ case VERSION =>
+ version
+ }
+ }
+}
+
+object ManageCluster extends AkkaApp with ArgumentsParser {
+ val GET_CONFIG = "getconfig"
+ val ADD_WORKER = "addworker"
+ val REMOVE_WORKER = "removeworker"
+ val KILL = "kill"
+ val VERSION = "version"
+ val QUERY = "query"
+ val COMMAND = "command"
+ val CONTAINER = "container"
+ val OUTPUT = "output"
+ val COUNT = "count"
+ val APPID = "appid"
+ val VERBOSE = "verbose"
+
+ val commands = List(GET_CONFIG, ADD_WORKER, REMOVE_WORKER, KILL, VERSION, QUERY)
+
+ import scala.concurrent.duration._
+ val TIME_OUT_SECONDS = 30.seconds
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ COMMAND -> CLIOption[String](s"<${commands.mkString("|")}>", required = true),
+ APPID -> CLIOption[String]("<Application id, format: application_timestamp_id>",
+ required = true),
+ COUNT -> CLIOption("<how many instance to add or remove>", required = false,
+ defaultValue = Some(1)),
+ VERBOSE -> CLIOption("<print verbose log on console>", required = false,
+ defaultValue = Some(false)),
+ OUTPUT -> CLIOption("<output path for configuration file>", required = false,
+ defaultValue = Some("")),
+ CONTAINER -> CLIOption("<container id for master or worker>", required = false,
+ defaultValue = Some(""))
+ )
+
+ override def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ val yarnConfig = new YarnConfig()
+ val yarnClient = new YarnClient(yarnConfig)
+
+ val parsed = parse(args)
+
+ if (parsed.getBoolean(VERBOSE)) {
+ LogUtil.verboseLogToConsole()
+ }
+
+ val appId = parseAppId(parsed.getString(APPID))
+ val system = ActorSystem("manageCluster", akkaConf)
+
+ val appMasterResolver = new AppMasterResolver(yarnClient, system)
+ val appMaster = appMasterResolver.resolve(appId)
+
+ implicit val dispatcher = system.dispatcher
+ val manager = new ManageCluster(appId, appMaster, system)
+
+ val command = parsed.getString(COMMAND)
+ val result = manager.command(command, parsed)
+
+ // scalastyle:off println
+ Console.println(Await.result(result, TIME_OUT_SECONDS))
+ // scalastyle:on println
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ def parseAppId(str: String): ApplicationId = {
+ val parts = str.split("_")
+ val timestamp = parts(1).toLong
+ val id = parts(2).toInt
+ ApplicationId.newInstance(timestamp, id)
+ }
+}
\ No newline at end of file