You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:48 UTC
[38/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
deleted file mode 100644
index 0568641..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.worker
-
-import java.io.File
-import java.lang.management.ManagementFactory
-import java.net.URL
-import java.util.concurrent.{Executors, TimeUnit}
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.SupervisorStrategy.Stop
-import akka.actor._
-import com.typesafe.config.{ConfigValueFactory, Config, ConfigFactory}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import io.gearpump.cluster.AppMasterToWorker._
-import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
-import io.gearpump.cluster.MasterToWorker._
-import io.gearpump.cluster.WorkerToAppMaster._
-import io.gearpump.cluster.WorkerToMaster._
-import io.gearpump.cluster.master.Master.MasterInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.worker.Worker.ExecutorWatcher
-import io.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.metrics.Metrics.ReportMetrics
-import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import io.gearpump.util.ActorSystemBooter.Daemon
-import io.gearpump.util.Constants._
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import io.gearpump.util.{TimeOutScheduler, _}
-
-/**
- * Worker is used to track the resource on single machine, it is like
- * the node manager of YARN.
- *
- * @param masterProxy masterProxy is used to resolve the master
- */
-private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
- private val systemConfig: Config = context.system.settings.config
-
- private val address = ActorUtil.getFullPath(context.system, self.path)
- private var resource = Resource.empty
- private var allocatedResources = Map[ActorRef, Resource]()
- private var executorsInfo = Map[ActorRef, ExecutorSlots]()
- private var id: WorkerId = WorkerId.unspecified
- private val createdTime = System.currentTimeMillis()
- private var masterInfo: MasterInfo = null
- private var executorNameToActor = Map.empty[String, ActorRef]
- private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
- private val jarStoreService = JarStoreService.get(systemConfig)
- jarStoreService.init(systemConfig, context.system)
-
- private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
- private val resourceUpdateTimeoutMs = 30000 // Milliseconds
-
- private var totalSlots: Int = 0
-
- val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
- var historyMetricsService: Option[ActorRef] = None
-
- override def receive: Receive = null
- var LOG: Logger = LogUtil.getLogger(getClass)
-
- def service: Receive =
- appMasterMsgHandler orElse
- clientMessageHandler orElse
- metricsService orElse
- terminationWatch(masterInfo.master) orElse
- ActorUtil.defaultMsgHandler(self)
-
- def metricsService: Receive = {
- case query: QueryHistoryMetrics =>
- if (historyMetricsService.isEmpty) {
- // Returns empty metrics so that we don't hang the UI
- sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
- } else {
- historyMetricsService.get forward query
- }
- }
-
- private var metricsInitialized = false
-
- val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
-
- private def initializeMetrics(): Unit = {
- // Registers jvm metrics
- val metricsSetName = "worker" + WorkerId.render(id)
- Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
-
- historyMetricsService = if (metricsEnabled) {
- val historyMetricsService = {
- context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
- }
-
- val metricsReportService = context.actorOf(Props(
- new MetricsReporterService(Metrics(context.system))))
- historyMetricsService.tell(ReportMetrics, metricsReportService)
- Some(historyMetricsService)
- } else {
- None
- }
- }
-
- def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
-
- // If master get disconnected, the WorkerRegistered may be triggered multiple times.
- case WorkerRegistered(id, masterInfo) =>
- this.id = id
-
- // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
- // itself.
- if (!metricsInitialized) {
- initializeMetrics()
- metricsInitialized = true
- }
-
- this.masterInfo = masterInfo
- timeoutTicker.cancel()
- context.watch(masterInfo.master)
- this.LOG = LogUtil.getLogger(getClass, worker = id)
- LOG.info(s"Worker is registered. " +
- s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
- sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
- resourceUpdateTimeoutMs, updateResourceTimeOut())
- context.become(service)
- }
-
- private def updateResourceTimeOut(): Unit = {
- LOG.error(s"Update worker resource time out")
- }
-
- def appMasterMsgHandler: Receive = {
- case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
- val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
- val executorToStop = executorNameToActor.get(actorName)
- if (executorToStop.isDefined) {
- LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
- s"due to: $reason")
- executorToStop.get.forward(shutdown)
- } else {
- LOG.error(s"Cannot find executor $actorName, ignore this message")
- sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
- }
- case launch: LaunchExecutor =>
- LOG.info(s"$launch")
- if (resource < launch.resource) {
- sender ! ExecutorLaunchRejected("There is no free resource on this machine")
- } else {
- val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
-
- val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
- jarStoreService, executorProcLauncher))
- executorNameToActor += actorName -> executor
-
- resource = resource - launch.resource
- allocatedResources = allocatedResources + (executor -> launch.resource)
-
- reportResourceToMaster()
- executorsInfo += executor ->
- ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
- context.watch(executor)
- }
- case UpdateResourceFailed(reason, ex) =>
- LOG.error(reason)
- context.stop(self)
- case UpdateResourceSucceed =>
- LOG.info(s"Update resource succeed")
- case GetWorkerData(workerId) =>
- val aliveFor = System.currentTimeMillis() - createdTime
- val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
- val userDir = System.getProperty("user.dir")
- sender ! WorkerData(WorkerSummary(
- id, "active",
- address,
- aliveFor,
- logDir,
- executorsInfo.values.toArray,
- totalSlots,
- resource.slots,
- userDir,
- jvmName = ManagementFactory.getRuntimeMXBean().getName(),
- resourceManagerContainerId = systemConfig.getString(
- GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
- historyMetricsConfig = getHistoryMetricsConfig)
- )
- case ChangeExecutorResource(appId, executorId, usedResource) =>
- for (executor <- executorActorRef(appId, executorId);
- allocatedResource <- allocatedResources.get(executor)) {
-
- allocatedResources += executor -> usedResource
- resource = resource + allocatedResource - usedResource
- reportResourceToMaster()
-
- if (usedResource == Resource(0)) {
- allocatedResources -= executor
- // stop executor if there is no resource binded to it.
- LOG.info(s"Shutdown executor $executorId because the resource used is zero")
- executor ! ShutdownExecutor(appId, executorId,
- "Shutdown executor because the resource used is zero")
- }
- }
- }
-
- private def reportResourceToMaster(): Unit = {
- sendMsgWithTimeOutCallBack(masterInfo.master,
- ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
- }
-
- private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
- val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
- executorNameToActor.get(actorName)
- }
-
- def clientMessageHandler: Receive = {
- case QueryWorkerConfig(workerId) =>
- if (this.id == workerId) {
- sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
- } else {
- sender ! WorkerConfig(ConfigFactory.empty)
- }
- }
-
- private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
- repeatActionUtil(
- seconds = timeOutSeconds,
- action = () => {
- masterProxy ! RegisterWorker(workerId)
- },
- onTimeout = () => {
- LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
- s"seconds, abort and kill the worker...")
- self ! PoisonPill
- })
- }
-
- def terminationWatch(master: ActorRef): Receive = {
- case Terminated(actor) =>
- if (actor.compareTo(master) == 0) {
- // Parent master is down, no point to keep worker anymore. Let's make suicide to free
- // resources
- LOG.info(s"Master cannot be contacted, find a new master ...")
- context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
- } else if (ActorUtil.isChildActorPath(self, actor)) {
- // One executor is down,
- LOG.info(s"Executor is down ${getExecutorName(actor)}")
-
- val allocated = allocatedResources.get(actor)
- if (allocated.isDefined) {
- resource = resource + allocated.get
- executorsInfo -= actor
- allocatedResources = allocatedResources - actor
- sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
- resourceUpdateTimeoutMs, updateResourceTimeOut())
- }
- }
- }
-
- private def getExecutorName(actorRef: ActorRef): Option[String] = {
- executorNameToActor.find(_._2 == actorRef).map(_._1)
- }
-
- private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
- val launcherClazz = Class.forName(
- systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
- launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
- .asInstanceOf[ExecutorProcessLauncher]
- }
-
- import context.dispatcher
- override def preStart(): Unit = {
- LOG.info(s"RegisterNewWorker")
- totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
- this.resource = Resource(totalSlots)
- masterProxy ! RegisterNewWorker
- context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
- }
-
- private def registerTimeoutTicker(seconds: Int): Cancellable = {
- repeatActionUtil(seconds, () => Unit, () => {
- LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
- s"abort and kill the worker...")
- self ! PoisonPill
- })
- }
-
- private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
- : Cancellable = {
- val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
- Duration(2, TimeUnit.SECONDS))(action())
- val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
- new Cancellable {
- def cancel(): Boolean = {
- val result1 = cancelTimeout.cancel()
- val result2 = cancelSuicide.cancel()
- result1 && result2
- }
-
- def isCancelled: Boolean = {
- cancelTimeout.isCancelled && cancelSuicide.isCancelled
- }
- }
- }
-
- override def postStop(): Unit = {
- LOG.info(s"Worker is going down....")
- ioPool.shutdown()
- context.system.terminate()
- }
-}
-
-private[cluster] object Worker {
-
- case class ExecutorResult(result: Try[Int])
-
- class ExecutorWatcher(
- launch: LaunchExecutor,
- masterInfo: MasterInfo,
- ioPool: ExecutionContext,
- jarStoreService: JarStoreService,
- procLauncher: ExecutorProcessLauncher) extends Actor {
- import launch.{appId, executorId, resource}
-
- private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
-
- val executorConfig: Config = {
- val workerConfig = context.system.settings.config
-
- val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
- Option(jvmConfig.executorAkkaConfig)
- }.getOrElse(ConfigFactory.empty())
-
- resolveExecutorConfig(workerConfig, submissionConfig)
- }
-
- // For some config, worker has priority, for others, user Application submission config
- // have priorities.
- private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
- val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
- .withoutPath(GEARPUMP_CLUSTER_MASTERS)
- .withoutPath(GEARPUMP_HOME)
- .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
- .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
- // Falls back to workerConfig
- .withFallback(workerConfig)
-
- // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
- val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
- val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
- LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
- config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
- } else {
- config
- }
-
- // Excludes reference.conf, and JVM properties..
- ClusterConfig.filterOutDefaultConfig(updatedConf)
- }
-
- implicit val executorService = ioPool
-
- private val executorHandler = {
- val ctx = launch.executorJvmConfig
-
- if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
- new ExecutorHandler {
- val exitPromise = Promise[Int]()
- val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
-
- override def destroy(): Unit = {
- context.stop(app)
- }
- override def exitValue: Future[Int] = {
- exitPromise.future
- }
- }
- } else {
- createProcess(ctx)
- }
- }
-
- private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
-
- val process = Future {
- val jarPath = ctx.jar.map { appJar =>
- val tempFile = File.createTempFile(appJar.name, ".jar")
- jarStoreService.copyToLocalFile(tempFile, appJar.filePath)
- val file = new URL("file:" + tempFile)
- file.getFile
- }
-
- val configFile = {
- val configFile = File.createTempFile("gearpump", ".conf")
- ClusterConfig.saveConfig(executorConfig, configFile)
- val file = new URL("file:" + configFile)
- file.getFile
- }
-
- val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
- ctx.classPath.map(path => expandEnviroment(path)) ++
- jarPath.map(Array(_)).getOrElse(Array.empty[String])
-
- val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
- val logArgs = List(
- s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
- s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
- s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
- s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
- val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
-
- val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
-
- // Remote debug executor process
- val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
- val remoteDebugConfig = if (remoteDebugFlag) {
- val availablePort = Util.findFreePort().get
- List(
- "-Xdebug",
- s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
- s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
- )
- } else {
- List.empty[String]
- }
-
- val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
- val verboseGCConfig = if (verboseGCFlag) {
- List(
- s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
- "-verbose:gc",
- "-XX:+PrintGCDetails",
- "-XX:+PrintGCDateStamps",
- "-XX:+PrintTenuringDistribution",
- "-XX:+PrintGCApplicationConcurrentTime",
- "-XX:+PrintGCApplicationStoppedTime"
- )
- } else {
- List.empty[String]
- }
-
- val ipv4 = List(s"-D${PREFER_IPV4}=true")
-
- val options = ctx.jvmArguments ++ username ++
- logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
-
- LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}")
- val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
- options, classPath, ctx.mainClass, ctx.arguments)
-
- ProcessInfo(process, jarPath, configFile)
- }
-
- new ExecutorHandler {
-
- var destroyed = false
-
- override def destroy(): Unit = {
- LOG.info(s"Destroy executor process ${ctx.mainClass}")
- if (!destroyed) {
- destroyed = true
- process.foreach { info =>
- info.process.destroy()
- info.jarPath.foreach(new File(_).delete())
- new File(info.configFile).delete()
- }
- }
- }
-
- override def exitValue: Future[Int] = {
- process.flatMap { info =>
- val exit = info.process.exitValue()
- if (exit == 0) {
- Future.successful(0)
- } else {
- Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
- s"error summary: ${info.process.logger.error}"))
- }
- }
- }
- }
- }
-
- private def expandEnviroment(path: String): String = {
- // TODO: extend this to support more environment.
- path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
- }
-
- override def preStart(): Unit = {
- executorHandler.exitValue.onComplete { value =>
- procLauncher.cleanProcess(appId, executorId)
- val result = ExecutorResult(value)
- self ! result
- }
- }
-
- override def postStop(): Unit = {
- executorHandler.destroy()
- }
-
- // The folders are under ${GEARPUMP_HOME}
- val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" +
- File.separator + "yarn")
-
- override def receive: Receive = {
- case ShutdownExecutor(appId, executorId, reason: String) =>
- executorHandler.destroy()
- sender ! ShutdownExecutorSucceed(appId, executorId)
- context.stop(self)
- case ExecutorResult(executorResult) =>
- executorResult match {
- case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
- case Failure(e) => LOG.error("Executor exit with errors", e)
- }
- context.stop(self)
- }
-
- private def getFormatedTime(timestamp: Long): String = {
- val datePattern = "yyyy-MM-dd-HH-mm"
- val format = new java.text.SimpleDateFormat(datePattern)
- format.format(timestamp)
- }
-
- private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
- classPath.filterNot(matchDaemonPattern(_))
- }
-
- private def matchDaemonPattern(path: String): Boolean = {
- daemonPathPattern.exists(path.contains(_))
- }
- }
-
- trait ExecutorHandler {
- def destroy(): Unit
- def exitValue: Future[Int]
- }
-
- case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
-
- /**
- * Starts the executor in the same JVM as worker.
- */
- class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
- extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
- private val exitCode = 0
-
- override val supervisorStrategy =
- OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
- case ex: Throwable =>
- LOG.error(s"system $name stopped ", ex)
- exit.failure(ex)
- Stop
- }
-
- override def postStop(): Unit = {
- if (!exit.isCompleted) {
- exit.success(exitCode)
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala
deleted file mode 100644
index 305bdc1..0000000
--- a/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.jarstore.dfs
-
-import java.io.File
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-import org.slf4j.Logger
-
-import io.gearpump.jarstore.{FilePath, JarStoreService}
-import io.gearpump.util.{Constants, LogUtil}
-
-/**
- * DFSJarStoreService store the uploaded jar on HDFS
- */
-class DFSJarStoreService extends JarStoreService {
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private var rootPath: Path = null
-
- override val scheme: String = "hdfs"
-
- override def init(config: Config, actorRefFactory: ActorSystem): Unit = {
- rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH))
- val fs = rootPath.getFileSystem(new Configuration())
- if (!fs.exists(rootPath)) {
- fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
- }
- }
-
- /**
- * This function will copy the remote file to local file system, called from client side.
- *
- * @param localFile The destination of file path
- * @param remotePath The remote file path from JarStore
- */
- override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
- LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from ${remotePath}")
- val filePath = new Path(rootPath, remotePath.path)
- val fs = filePath.getFileSystem(new Configuration())
- val target = new Path(localFile.toURI().toString)
- fs.copyToLocalFile(filePath, target)
- }
-
- /**
- * This function will copy the local file to the remote JarStore, called from client side.
- *
- * @param localFile The local file
- */
- override def copyFromLocal(localFile: File): FilePath = {
- val remotePath = FilePath(Math.abs(new java.util.Random().nextLong()).toString)
- LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to ${remotePath}")
- val filePath = new Path(rootPath, remotePath.path)
- val fs = filePath.getFileSystem(new Configuration())
- fs.copyFromLocalFile(new Path(localFile.toURI.toString), filePath)
- remotePath
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala
deleted file mode 100644
index fa1a240..0000000
--- a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.jarstore.local
-
-import java.io.File
-
-import akka.actor.{Actor, Stash}
-import akka.pattern.pipe
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
-import io.gearpump.util._
-
-/**
- * LocalJarStore store the uploaded jar on local disk.
- */
-class LocalJarStore(rootDirPath: String) extends Actor with Stash {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
- val rootDirectory = new File(rootDirPath)
-
- FileUtils.forceMkdir(rootDirectory)
-
- val server = new FileServer(context.system, host, 0, rootDirectory)
-
- implicit val timeout = Constants.FUTURE_TIMEOUT
- implicit val executionContext = context.dispatcher
-
- server.start pipeTo self
-
- def receive: Receive = {
- case FileServer.Port(port) =>
- context.become(listen(port))
- unstashAll()
- case _ =>
- stash()
- }
-
- def listen(port: Int): Receive = {
- case GetJarStoreServer =>
- sender ! JarStoreServerAddress(s"http://$host:$port/")
- }
-
- override def postStop(): Unit = {
- server.stop
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala
deleted file mode 100644
index 969ce90..0000000
--- a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.jarstore.local
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.ask
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.jarstore.{FilePath, JarStoreService}
-import io.gearpump.util._
-
-/**
- * LocalJarStoreService store the uploaded jar on local disk.
- */
-class LocalJarStoreService extends JarStoreService {
- private def LOG: Logger = LogUtil.getLogger(getClass)
- private implicit val timeout = Constants.FUTURE_TIMEOUT
- private var system: akka.actor.ActorSystem = null
- private var master: ActorRef = null
- private implicit def dispatcher: ExecutionContext = system.dispatcher
-
- override val scheme: String = "file"
-
- override def init(config: Config, system: ActorSystem): Unit = {
- this.system = system
- val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
- .asScala.flatMap(Util.parseHostList)
- master = system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}")
- }
-
- private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]]
- .map { address =>
- val client = new FileServer.Client(system, address.url)
- client
- }
-
- /**
- * This function will copy the remote file to local file system, called from client side.
- *
- * @param localFile The destination of file path
- * @param remotePath The remote file path from JarStore
- */
- override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
- LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath")
- val future = client.flatMap(_.download(remotePath, localFile))
- Await.ready(future, Duration(60, TimeUnit.SECONDS))
- }
-
- /**
- * This function will copy the local file to the remote JarStore, called from client side.
- * @param localFile The local file
- */
- override def copyFromLocal(localFile: File): FilePath = {
- val future = client.flatMap(_.upload(localFile))
- Await.result(future, Duration(60, TimeUnit.SECONDS))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
deleted file mode 100644
index 1824a22..0000000
--- a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.File
-import scala.concurrent.{ExecutionContext, Future}
-
-import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
-import akka.stream.Materializer
-import akka.stream.scaladsl.FileIO
-import akka.util.ByteString
-
-/**
- * FileDirective is a set of Akka-http directive to upload/download
- * huge binary files to/from Akka-Http server.
- */
-object FileDirective {
-
- // Form field name
- type Name = String
-
- val CHUNK_SIZE = 262144
-
- /**
- * File information after a file is uploaded to server.
- *
- * @param originFileName original file name when user upload it in browser.
- * @param file file name after the file is saved to server.
- * @param length the length of the file
- */
- case class FileInfo(originFileName: String, file: File, length: Long)
-
- class Form(val fields: Map[Name, FormField]) {
- def getFile(fieldName: String): Option[FileInfo] = {
- fields.get(fieldName).flatMap {
- case Left(file) => Option(file)
- case Right(_) => None
- }
- }
-
- def getValue(fieldName: String): Option[String] = {
- fields.get(fieldName).flatMap {
- case Left(_) => None
- case Right(value) => Option(value)
- }
- }
- }
-
- type FormField = Either[FileInfo, String]
-
- /**
- * directive to uploadFile, it store the uploaded files
- * to temporary directory, and return a Map from form field name
- * to FileInfo.
- */
- def uploadFile: Directive1[Form] = {
- uploadFileTo(null)
- }
-
- /**
- * Store the uploaded files to specific rootDirectory.
- *
- * @param rootDirectory directory to store the files.
- * @return
- */
- def uploadFileTo(rootDirectory: File): Directive1[Form] = {
- Directive[Tuple1[Form]] { inner =>
- extractMaterializer {implicit mat =>
- extractExecutionContext {implicit ec =>
- uploadFileImpl(rootDirectory)(mat, ec) { filesFuture =>
- ctx => {
- filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
- }
- }
- }
- }
- }
- }
-
- // Downloads file from server
- def downloadFile(file: File): Route = {
- val responseEntity = HttpEntity(
- MediaTypes.`application/octet-stream`,
- file.length,
- FileIO.fromFile(file, CHUNK_SIZE))
- complete(responseEntity)
- }
-
- private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext)
- : Directive1[Future[Form]] = {
- Directive[Tuple1[Future[Form]]] { inner =>
- entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
- val form = formdata.parts.mapAsync(1) { p =>
- if (p.filename.isDefined) {
-
- // Reserve the suffix
- val targetPath = File.createTempFile(s"userfile_${p.name}_",
- s"${p.filename.getOrElse("")}", rootDirectory)
- val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
- written.map(written =>
- if (written.count > 0) {
- Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count)))
- } else {
- Map.empty[Name, FormField]
- })
- } else {
- val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) =>
- total ++ input
- }
- valueFuture.map{value =>
- Map(p.name -> Right(value.utf8String))
- }
- }
- }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) =>
- new Form(set.fields ++ value)
- }
-
- inner(Tuple1(form))
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/util/FileServer.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileServer.scala b/daemon/src/main/scala/io/gearpump/util/FileServer.scala
deleted file mode 100644
index bf389f7..0000000
--- a/daemon/src/main/scala/io/gearpump/util/FileServer.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.util
-
-import java.io.File
-import scala.concurrent.{ExecutionContext, Future}
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.ServerBinding
-import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model.Uri.{Path, Query}
-import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
-import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.{FileIO, Sink, Source}
-import spray.json.DefaultJsonProtocol._
-import spray.json.JsonFormat
-
-import io.gearpump.jarstore.FilePath
-import io.gearpump.util.FileDirective._
-import io.gearpump.util.FileServer.Port
-
-/**
- * A simple file server implemented with akka-http to store/fetch large
- * binary files.
- */
-class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory: File) {
- import system.dispatcher
- implicit val actorSystem = system
- implicit val materializer = ActorMaterializer()
- implicit def ec: ExecutionContext = system.dispatcher
-
- val route: Route = {
- path("upload") {
- uploadFileTo(rootDirectory) { form =>
- val fileName = form.fields.headOption.flatMap { pair =>
- val (_, fileInfo) = pair
- fileInfo match {
- case Left(file) => Option(file.file).map(_.getName)
- case Right(_) => None
- }
- }
-
- if (fileName.isDefined) {
- complete(fileName.get)
- } else {
- failWith(new Exception("File not found in the uploaded form"))
- }
- }
- } ~
- path("download") {
- parameters("file") { file: String =>
- downloadFile(new File(rootDirectory, file))
- }
- } ~
- pathEndOrSingleSlash {
- extractUri { uri =>
- val upload = uri.withPath(Uri.Path("/upload")).toString()
- val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`,
- s"""
- |
- |<h2>Please specify a file to upload:</h2>
- |<form action="$upload" enctype="multipart/form-data" method="post">
- |<input type="file" name="datafile" size="40">
- |</p>
- |<div>
- |<input type="submit" value="Submit">
- |</div>
- |</form>
- """.stripMargin)
- complete(entity)
- }
- }
- }
-
- private var connection: Future[ServerBinding] = null
-
- def start: Future[Port] = {
- connection = Http().bindAndHandle(Route.handlerFlow(route), host, port)
- connection.map(address => Port(address.localAddress.getPort))
- }
-
- def stop: Future[Unit] = {
- connection.flatMap(_.unbind())
- }
-}
-
-object FileServer {
-
- implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply)
-
- case class Port(port: Int)
-
- /**
- * Client of [[io.gearpump.util.FileServer]]
- */
- class Client(system: ActorSystem, host: String, port: Int) {
-
- def this(system: ActorSystem, url: String) = {
- this(system, Uri(url).authority.host.address(), Uri(url).authority.port)
- }
-
- private implicit val actorSystem = system
- private implicit val materializer = ActorMaterializer()
- private implicit val ec = system.dispatcher
-
- val server = Uri(s"http://$host:$port")
- val httpClient = Http(system).outgoingConnection(server.authority.host.address(),
- server.authority.port)
-
- def upload(file: File): Future[FilePath] = {
- val target = server.withPath(Path("/upload"))
-
- val request = entity(file).map { entity =>
- HttpRequest(HttpMethods.POST, uri = target, entity = entity)
- }
-
- val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head)
- response.flatMap { some =>
- Unmarshal(some).to[String]
- }.map { path =>
- FilePath(path)
- }
- }
-
- def download(remoteFile: FilePath, saveAs: File): Future[Unit] = {
- val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path))
- // Download file to local
- val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head)
- val downloaded = response.flatMap { response =>
- response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
- }
- downloaded.map(written => Unit)
- }
-
- private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
- val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
- FileIO.fromFile(file, chunkSize = 100000))
- val body = Source.single(
- Multipart.FormData.BodyPart(
- "uploadfile",
- entity,
- Map("filename" -> file.getName)))
- val form = Multipart.FormData(body)
-
- Marshal(form).to[RequestEntity]
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
new file mode 100644
index 0000000..9e55be6
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.ActorRef
+
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.WorkerId
+
+/**
+ * Cluster Bootup Flow
+ */
+object WorkerToMaster {
+
+ /** When an worker is started, it sends RegisterNewWorker */
+ case object RegisterNewWorker
+
+ /** When worker lose connection with master, it tries to register itself again with old Id. */
+ case class RegisterWorker(workerId: WorkerId)
+
+ /** Worker is responsible to broadcast its current status to master */
+ case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
+}
+
+object MasterToWorker {
+
+ /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
+ case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
+
+ /** Worker have not received reply from master */
+ case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
+
+ /** Master is synced with worker on resource slots managed by current worker */
+ case object UpdateResourceSucceed
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
new file mode 100644
index 0000000..9bde4d1
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.embedded
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import com.typesafe.config.{Config, ConfigValueFactory}
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.master.{Master => MasterActor}
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
+import org.apache.gearpump.util.{LogUtil, Util}
+
+/**
+ * Create a in-process cluster with single worker
+ */
+class EmbeddedCluster(inputConfig: Config) {
+
+ private val workerCount: Int = 1
+ private var _master: ActorRef = null
+ private var _system: ActorSystem = null
+ private var _config: Config = null
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ def start(): Unit = {
+ val port = Util.findFreePort().get
+ val akkaConf = getConfig(inputConfig, port)
+ _config = akkaConf
+ val system = ActorSystem(MASTER, akkaConf)
+
+ val master = system.actorOf(Props[MasterActor], MASTER)
+
+ 0.until(workerCount).foreach { id =>
+ system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
+ }
+ this._master = master
+ this._system = system
+
+ LOG.info("=================================")
+ LOG.info("Local Cluster is started at: ")
+ LOG.info(s" 127.0.0.1:$port")
+ LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+ }
+
+ private def getConfig(inputConfig: Config, port: Int): Config = {
+ val config = inputConfig.
+ withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
+ withValue(GEARPUMP_CLUSTER_MASTERS,
+ ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
+ withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
+ ConfigValueFactory.fromAnyRef(true)).
+ withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
+ withValue("akka.actor.provider",
+ ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
+ config
+ }
+
+ def newClientContext: ClientContext = {
+ ClientContext(_config, _system, _master)
+ }
+
+ def stop(): Unit = {
+ _system.stop(_master)
+ _system.terminate()
+ Await.result(_system.whenTerminated, Duration.Inf)
+ }
+}
+
+object EmbeddedCluster {
+ def apply(): EmbeddedCluster = {
+ new EmbeddedCluster(ClusterConfig.master())
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
new file mode 100644
index 0000000..98ec707
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import java.io.File
+import java.net.{URL, URLClassLoader}
+import java.util.jar.JarFile
+import scala.util.Try
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+
+/** Tool to submit an application jar to cluster */
+object AppSubmitter extends AkkaApp with ArgumentsParser {
+ val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val ignoreUnknownArgument = true
+
+ override val description = "Submit an application to Master by providing a jar"
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "namePrefix" -> CLIOption[String]("<application name prefix>", required = false,
+ defaultValue = Some("")),
+ "jar" -> CLIOption("<application>.jar", required = true),
+ "executors" -> CLIOption[Int]("number of executor to launch", required = false,
+ defaultValue = Some(1)),
+ "verbose" -> CLIOption("<print verbose log on console>", required = false,
+ defaultValue = Some(false)),
+ // For document purpose only, OPTION_CONFIG option is not used here.
+ // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ val config = parse(args)
+ if (null != config) {
+
+ val verbose = config.getBoolean("verbose")
+ if (verbose) {
+ LogUtil.verboseLogToConsole()
+ }
+
+ val jar = config.getString("jar")
+
+ // Set jar path to be submitted to cluster
+ System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
+ System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
+
+ val namePrefix = config.getString("namePrefix")
+ if (namePrefix.nonEmpty) {
+ if (!Util.validApplicationName(namePrefix)) {
+ throw new Exception(s"$namePrefix is not a valid prefix for an application name")
+ }
+ System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
+ }
+
+ val jarFile = new java.io.File(jar)
+
+ // Start main class
+ if (!jarFile.exists()) {
+ throw new Exception(s"jar $jar does not exist")
+ }
+
+ val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" +
+ jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader())
+ val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader)
+
+ // Set to context classloader. ActorSystem pick context classloader in preference
+ Thread.currentThread().setContextClassLoader(classLoader)
+ val clazz = classLoader.loadClass(main)
+ val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+ mainMethod.invoke(null, arguments)
+ }
+ }
+
+ private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader)
+ : (String, Array[String]) = {
+ val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.
+ getValue("Main-Class")).getOrElse("")
+
+ if (remainArgs.length > 0 && Try(classLoader.loadClass(remainArgs(0))).isSuccess) {
+ (remainArgs(0), remainArgs.drop(1))
+ } else if (mainInManifest.nonEmpty) {
+ (mainInManifest, remainArgs)
+ } else {
+ throw new Exception("No main class specified")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
new file mode 100644
index 0000000..672fee6
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+object Gear {
+
+ val OPTION_CONFIG = "conf"
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ val commands = Map("app" -> AppSubmitter, "kill" -> Kill,
+ "info" -> Info, "replay" -> Replay, "main" -> MainRunner)
+
+ def usage(): Unit = {
+ val keys = commands.keys.toList.sorted
+ // scalastyle:off println
+ Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+ // scalastyle:on println
+ }
+
+ private def executeCommand(command: String, commandArgs: Array[String]) = {
+ commands.get(command).map(_.main(commandArgs))
+ if (!commands.contains(command)) {
+ val allArgs = (command +: commandArgs.toList).toArray
+ MainRunner.main(allArgs)
+ }
+ }
+
+ def main(inputArgs: Array[String]): Unit = {
+ val (configFile, args) = extractConfig(inputArgs)
+ if (configFile != null) {
+ // Sets custom config file...
+ System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile)
+ }
+
+ if (args.length == 0) {
+ usage()
+ } else {
+ val command = args(0)
+ val commandArgs = args.drop(1)
+ executeCommand(command, commandArgs)
+ }
+ }
+
+ private def extractConfig(inputArgs: Array[String]): (String, Array[String]) = {
+ var index = 0
+
+ var result = List.empty[String]
+ var configFile: String = null
+ while (index < inputArgs.length) {
+ val item = inputArgs(index)
+ if (item == s"-$OPTION_CONFIG") {
+ index += 1
+ configFile = inputArgs(index)
+ } else {
+ result = result :+ item
+ }
+ index += 1
+ }
+ (configFile, result.toArray)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
new file mode 100644
index 0000000..bf444a3
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Tool to query master info */
+object Info extends AkkaApp with ArgumentsParser {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ // For document purpose only, OPTION_CONFIG option is not used here.
+ // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
+
+ override val description = "Query the Application list"
+
+ // scalastyle:off println
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val client = ClientContext(akkaConf)
+
+ val AppMastersData(appMasters) = client.listApps
+ Console.println("== Application Information ==")
+ Console.println("====================================")
+ appMasters.foreach { appData =>
+ Console.println(s"application: ${appData.appId}, name: ${appData.appName}, " +
+ s"status: ${appData.status}, worker: ${appData.workerPath}")
+ }
+ client.close()
+ }
+ // scalastyle:on println
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
new file mode 100644
index 0000000..17f6214
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Tool to kill an App */
+object Kill extends AkkaApp with ArgumentsParser {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "appid" -> CLIOption("<application id>", required = true),
+ // For document purpose only, OPTION_CONFIG option is not used here.
+ // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
+
+ override val description = "Kill an application with application Id"
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+
+ if (null != config) {
+ val client = ClientContext(akkaConf)
+ LOG.info("Client ")
+ client.shutdown(config.getInt("appid"))
+ client.close()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
new file mode 100644
index 0000000..db71b7b
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorSystem, Props}
+import com.typesafe.config.ConfigValueFactory
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.{Master => MasterActor}
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
+
+object Local extends AkkaApp with ArgumentsParser {
+ override def akkaConfig: Config = ClusterConfig.master()
+
+ var LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] =
+ Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)),
+ "workernum" -> CLIOption[Int]("<how many workers to start>", required = false,
+ defaultValue = Some(2)))
+
+ override val description = "Start a local cluster"
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ this.LOG = {
+ LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL)
+ LogUtil.getLogger(getClass)
+ }
+
+ val config = parse(args)
+ if (null != config) {
+ local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
+ }
+ }
+
+ def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
+ if (sameProcess) {
+ LOG.info("Starting local in same process")
+ System.setProperty("LOCAL", "true")
+ }
+ val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+ .asScala.flatMap(Util.parseHostList)
+ val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
+
+ if (masters.size != 1 && masters.head.host != local) {
+ LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
+ s"with ${Constants.GEARPUMP_HOSTNAME}")
+ } else {
+
+ val hostPort = masters.head
+ implicit val system = ActorSystem(MASTER, akkaConf.
+ withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port))
+ )
+
+ val master = system.actorOf(Props[MasterActor], MASTER)
+ val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER"
+
+ 0.until(workerCount).foreach { id =>
+ system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
+ }
+
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
new file mode 100644
index 0000000..c6c9f10
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Tool to run any main class by providing a jar */
+object MainRunner extends AkkaApp with ArgumentsParser {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ // For document purpose only, OPTION_CONFIG option is not used here.
+ // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val mainClazz = args(0)
+ val commandArgs = args.drop(1)
+
+ val clazz = Thread.currentThread().getContextClassLoader().loadClass(mainClazz)
+ val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+ mainMethod.invoke(null, commandArgs)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
new file mode 100644
index 0000000..f1b9bdf
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor._
+import akka.cluster.ClusterEvent._
+import akka.cluster.ddata.DistributedData
+import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings}
+import akka.cluster.{Cluster, Member, MemberStatus}
+import com.typesafe.config.ConfigValueFactory
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode}
+import org.apache.gearpump.cluster.master.Master.MasterListUpdated
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
+
+object Master extends AkkaApp with ArgumentsParser {
+
+ private var LOG: Logger = LogUtil.getLogger(getClass)
+
+ override def akkaConfig: Config = ClusterConfig.master()
+
+ override val options: Array[(String, CLIOption[Any])] =
+ Array("ip" -> CLIOption[String]("<master ip address>", required = true),
+ "port" -> CLIOption("<master port>", required = true))
+
+ override val description = "Start Master daemon"
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+
+ this.LOG = {
+ LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
+ LogUtil.getLogger(getClass)
+ }
+
+ val config = parse(args)
+ master(config.getString("ip"), config.getInt("port"), akkaConf)
+ }
+
+ private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = {
+ masters.exists { hostPort =>
+ hostPort == s"$master:$port"
+ }
+ }
+
+ private def master(ip: String, port: Int, akkaConf: Config): Unit = {
+ val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+
+ if (!verifyMaster(ip, port, masters)) {
+ LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " +
+ s"gearpump.cluster.masters: ${masters.mkString(", ")}")
+ System.exit(-1)
+ }
+
+ val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava
+ val quorum = masterList.size() / 2 + 1
+ val masterConfig = akkaConf.
+ withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
+ withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
+ withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)).
+ withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
+ ConfigValueFactory.fromAnyRef(quorum))
+
+ LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
+ val system = ActorSystem(MASTER, masterConfig)
+
+ val replicator = DistributedData(system).replicator
+ LOG.info(s"Replicator path: ${replicator.path}")
+
+ // Starts singleton manager
+ val singletonManager = system.actorOf(ClusterSingletonManager.props(
+ singletonProps = Props(classOf[MasterWatcher], MASTER),
+ terminationMessage = PoisonPill,
+ settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
+ .withRole(MASTER)),
+ name = SINGLETON_MANAGER)
+
+ // Start master proxy
+ val masterProxy = system.actorOf(ClusterSingletonProxy.props(
+ singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
+ // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
+ // Master is created when there is a majority of machines started.
+ settings = ClusterSingletonProxySettings(system)
+ .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
+ name = MASTER
+ )
+
+ LOG.info(s"master proxy is started at ${masterProxy.path}")
+
+ val mainThread = Thread.currentThread()
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ override def run(): Unit = {
+ if (!system.whenTerminated.isCompleted) {
+ LOG.info("Triggering shutdown hook....")
+
+ system.stop(masterProxy)
+ val cluster = Cluster(system)
+ cluster.leave(cluster.selfAddress)
+ cluster.down(cluster.selfAddress)
+ try {
+ Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
+ } catch {
+ case ex: Exception => // Ignore
+ }
+ system.terminate()
+ mainThread.join()
+ }
+ }
+ })
+
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
+
+class MasterWatcher(role: String) extends Actor with ActorLogging {
+ import context.dispatcher
+
+ val cluster = Cluster(context.system)
+
+ val config = context.system.settings.config
+ val masters = config.getList("akka.cluster.seed-nodes")
+ val quorum = masters.size() / 2 + 1
+
+ val system = context.system
+
+ // Sorts by age, oldest first
+ val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) }
+ var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
+
+ def receive: Receive = null
+
+ // Subscribes to MemberEvent, re-subscribe when restart
+ override def preStart(): Unit = {
+ cluster.subscribe(self, classOf[MemberEvent])
+ context.become(waitForInit)
+ }
+ override def postStop(): Unit = {
+ cluster.unsubscribe(self)
+ }
+
+ def matchingRole(member: Member): Boolean = member.hasRole(role)
+
+ def waitForInit: Receive = {
+ case state: CurrentClusterState => {
+ membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m =>
+ m.status == MemberStatus.Up && matchingRole(m))
+
+ if (membersByAge.size < quorum) {
+ membersByAge.iterator.mkString(",")
+ log.info(s"We cannot get a quorum, $quorum, " +
+ s"shutting down...${membersByAge.iterator.mkString(",")}")
+ context.become(waitForShutdown)
+ self ! MasterWatcher.Shutdown
+ } else {
+ val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
+ notifyMasterMembersChange(master)
+ context.become(waitForClusterEvent(master))
+ }
+ }
+ }
+
+ def waitForClusterEvent(master: ActorRef): Receive = {
+ case MemberUp(m) if matchingRole(m) => {
+ membersByAge += m
+ notifyMasterMembersChange(master)
+ }
+ case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
+ mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
+ log.info(s"member removed ${mEvent.member}")
+ val m = mEvent.member
+ membersByAge -= m
+ if (membersByAge.size < quorum) {
+ log.info(s"We cannot get a quorum, $quorum, " +
+ s"shutting down...${membersByAge.iterator.mkString(",")}")
+ context.become(waitForShutdown)
+ self ! MasterWatcher.Shutdown
+ } else {
+ notifyMasterMembersChange(master)
+ }
+ }
+ }
+
+ private def notifyMasterMembersChange(master: ActorRef): Unit = {
+ val masters = membersByAge.toList.map{ member =>
+ MasterNode(member.address.host.getOrElse("Unknown-Host"),
+ member.address.port.getOrElse(0))
+ }
+ master ! MasterListUpdated(masters)
+ }
+
+ def waitForShutdown: Receive = {
+ case MasterWatcher.Shutdown => {
+ cluster.unsubscribe(self)
+ cluster.leave(cluster.selfAddress)
+ context.stop(self)
+ system.scheduler.scheduleOnce(Duration.Zero) {
+ try {
+ Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
+ } catch {
+ case ex: Exception => // Ignore
+ }
+ system.terminate()
+ }
+ }
+ }
+}
+
+object MasterWatcher {
+ object Shutdown
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
new file mode 100644
index 0000000..d721832
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+// Internal tool to restart an application
+object Replay extends AkkaApp with ArgumentsParser {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "appid" -> CLIOption("<application id>", required = true),
+ // For document purpose only, OPTION_CONFIG option is not used here.
+ // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+ Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+ defaultValue = None))
+
+ override val description = "Replay the application from current min clock(low watermark)"
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val config = parse(args)
+
+ if (null != config) {
+ val client = ClientContext(akkaConf)
+ client.replayFromTimestampWindowTrailingEdge(config.getInt("appid"))
+ client.close()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
new file mode 100644
index 0000000..58a9dec
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorSystem, Props}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Tool to start a worker daemon process */
+object Worker extends AkkaApp with ArgumentsParser {
+ protected override def akkaConfig = ClusterConfig.worker()
+
+ override val description = "Start a worker daemon"
+
+ var LOG: Logger = LogUtil.getLogger(getClass)
+
+ private def uuid = java.util.UUID.randomUUID.toString
+
+ def main(akkaConf: Config, args: Array[String]): Unit = {
+ val id = uuid
+
+ this.LOG = {
+ LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
+ // Delay creation of LOG instance to avoid creating an empty log file as we
+ // reset the log file name here
+ LogUtil.getLogger(getClass)
+ }
+
+ val system = ActorSystem(id, akkaConf)
+
+ val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address =>
+ val hostAndPort = address.split(":")
+ HostPort(hostAndPort(0), hostAndPort(1).toInt)
+ }
+
+ LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...")
+ val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}")
+
+ system.actorOf(Props(classOf[WorkerActor], masterProxy),
+ classOf[WorkerActor].getSimpleName + id)
+
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
\ No newline at end of file