You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:48 UTC

[38/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
deleted file mode 100644
index 0568641..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.worker
-
-import java.io.File
-import java.lang.management.ManagementFactory
-import java.net.URL
-import java.util.concurrent.{Executors, TimeUnit}
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.SupervisorStrategy.Stop
-import akka.actor._
-import com.typesafe.config.{ConfigValueFactory, Config, ConfigFactory}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import io.gearpump.cluster.AppMasterToWorker._
-import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
-import io.gearpump.cluster.MasterToWorker._
-import io.gearpump.cluster.WorkerToAppMaster._
-import io.gearpump.cluster.WorkerToMaster._
-import io.gearpump.cluster.master.Master.MasterInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.worker.Worker.ExecutorWatcher
-import io.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.metrics.Metrics.ReportMetrics
-import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import io.gearpump.util.ActorSystemBooter.Daemon
-import io.gearpump.util.Constants._
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import io.gearpump.util.{TimeOutScheduler, _}
-
-/**
- * Worker is used to track the resource on single machine, it is like
- * the node manager of YARN.
- *
- * @param masterProxy masterProxy is used to resolve the master
- */
-private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
-  private val systemConfig: Config = context.system.settings.config
-
-  private val address = ActorUtil.getFullPath(context.system, self.path)
-  private var resource = Resource.empty
-  private var allocatedResources = Map[ActorRef, Resource]()
-  private var executorsInfo = Map[ActorRef, ExecutorSlots]()
-  private var id: WorkerId = WorkerId.unspecified
-  private val createdTime = System.currentTimeMillis()
-  private var masterInfo: MasterInfo = null
-  private var executorNameToActor = Map.empty[String, ActorRef]
-  private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
-  private val jarStoreService = JarStoreService.get(systemConfig)
-  jarStoreService.init(systemConfig, context.system)
-
-  private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
-  private val resourceUpdateTimeoutMs = 30000 // Milliseconds
-
-  private var totalSlots: Int = 0
-
-  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
-  var historyMetricsService: Option[ActorRef] = None
-
-  override def receive: Receive = null
-  var LOG: Logger = LogUtil.getLogger(getClass)
-
-  def service: Receive =
-    appMasterMsgHandler orElse
-      clientMessageHandler orElse
-      metricsService orElse
-      terminationWatch(masterInfo.master) orElse
-      ActorUtil.defaultMsgHandler(self)
-
-  def metricsService: Receive = {
-    case query: QueryHistoryMetrics =>
-      if (historyMetricsService.isEmpty) {
-        // Returns empty metrics so that we don't hang the UI
-        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
-      } else {
-        historyMetricsService.get forward query
-      }
-  }
-
-  private var metricsInitialized = false
-
-  val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
-
-  private def initializeMetrics(): Unit = {
-    // Registers jvm metrics
-    val metricsSetName = "worker" + WorkerId.render(id)
-    Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
-
-    historyMetricsService = if (metricsEnabled) {
-      val historyMetricsService = {
-        context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
-      }
-
-      val metricsReportService = context.actorOf(Props(
-        new MetricsReporterService(Metrics(context.system))))
-      historyMetricsService.tell(ReportMetrics, metricsReportService)
-      Some(historyMetricsService)
-    } else {
-      None
-    }
-  }
-
-  def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
-
-    // If master get disconnected, the WorkerRegistered may be triggered multiple times.
-    case WorkerRegistered(id, masterInfo) =>
-      this.id = id
-
-      // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
-      // itself.
-      if (!metricsInitialized) {
-        initializeMetrics()
-        metricsInitialized = true
-      }
-
-      this.masterInfo = masterInfo
-      timeoutTicker.cancel()
-      context.watch(masterInfo.master)
-      this.LOG = LogUtil.getLogger(getClass, worker = id)
-      LOG.info(s"Worker is registered. " +
-        s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
-      sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
-        resourceUpdateTimeoutMs, updateResourceTimeOut())
-      context.become(service)
-  }
-
-  private def updateResourceTimeOut(): Unit = {
-    LOG.error(s"Update worker resource time out")
-  }
-
-  def appMasterMsgHandler: Receive = {
-    case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
-      val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
-      val executorToStop = executorNameToActor.get(actorName)
-      if (executorToStop.isDefined) {
-        LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
-          s"due to: $reason")
-        executorToStop.get.forward(shutdown)
-      } else {
-        LOG.error(s"Cannot find executor $actorName, ignore this message")
-        sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
-      }
-    case launch: LaunchExecutor =>
-      LOG.info(s"$launch")
-      if (resource < launch.resource) {
-        sender ! ExecutorLaunchRejected("There is no free resource on this machine")
-      } else {
-        val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
-
-        val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
-          jarStoreService, executorProcLauncher))
-        executorNameToActor += actorName -> executor
-
-        resource = resource - launch.resource
-        allocatedResources = allocatedResources + (executor -> launch.resource)
-
-        reportResourceToMaster()
-        executorsInfo += executor ->
-          ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
-        context.watch(executor)
-      }
-    case UpdateResourceFailed(reason, ex) =>
-      LOG.error(reason)
-      context.stop(self)
-    case UpdateResourceSucceed =>
-      LOG.info(s"Update resource succeed")
-    case GetWorkerData(workerId) =>
-      val aliveFor = System.currentTimeMillis() - createdTime
-      val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
-      val userDir = System.getProperty("user.dir")
-      sender ! WorkerData(WorkerSummary(
-        id, "active",
-        address,
-        aliveFor,
-        logDir,
-        executorsInfo.values.toArray,
-        totalSlots,
-        resource.slots,
-        userDir,
-        jvmName = ManagementFactory.getRuntimeMXBean().getName(),
-        resourceManagerContainerId = systemConfig.getString(
-          GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
-        historyMetricsConfig = getHistoryMetricsConfig)
-      )
-    case ChangeExecutorResource(appId, executorId, usedResource) =>
-      for (executor <- executorActorRef(appId, executorId);
-        allocatedResource <- allocatedResources.get(executor)) {
-
-        allocatedResources += executor -> usedResource
-        resource = resource + allocatedResource - usedResource
-        reportResourceToMaster()
-
-        if (usedResource == Resource(0)) {
-          allocatedResources -= executor
-          // stop executor if there is no resource binded to it.
-          LOG.info(s"Shutdown executor $executorId because the resource used is zero")
-          executor ! ShutdownExecutor(appId, executorId,
-            "Shutdown executor because the resource used is zero")
-        }
-      }
-  }
-
-  private def reportResourceToMaster(): Unit = {
-    sendMsgWithTimeOutCallBack(masterInfo.master,
-      ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
-  }
-
-  private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
-    val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
-    executorNameToActor.get(actorName)
-  }
-
-  def clientMessageHandler: Receive = {
-    case QueryWorkerConfig(workerId) =>
-      if (this.id == workerId) {
-        sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
-      } else {
-        sender ! WorkerConfig(ConfigFactory.empty)
-      }
-  }
-
-  private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
-    repeatActionUtil(
-      seconds = timeOutSeconds,
-      action = () => {
-        masterProxy ! RegisterWorker(workerId)
-      },
-      onTimeout = () => {
-        LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
-          s"seconds, abort and kill the worker...")
-        self ! PoisonPill
-      })
-  }
-
-  def terminationWatch(master: ActorRef): Receive = {
-    case Terminated(actor) =>
-      if (actor.compareTo(master) == 0) {
-        // Parent master is down, no point to keep worker anymore. Let's make suicide to free
-        // resources
-        LOG.info(s"Master cannot be contacted, find a new master ...")
-        context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
-      } else if (ActorUtil.isChildActorPath(self, actor)) {
-        // One executor is down,
-        LOG.info(s"Executor is down ${getExecutorName(actor)}")
-
-        val allocated = allocatedResources.get(actor)
-        if (allocated.isDefined) {
-          resource = resource + allocated.get
-          executorsInfo -= actor
-          allocatedResources = allocatedResources - actor
-          sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
-            resourceUpdateTimeoutMs, updateResourceTimeOut())
-        }
-      }
-  }
-
-  private def getExecutorName(actorRef: ActorRef): Option[String] = {
-    executorNameToActor.find(_._2 == actorRef).map(_._1)
-  }
-
-  private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
-    val launcherClazz = Class.forName(
-      systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
-    launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
-      .asInstanceOf[ExecutorProcessLauncher]
-  }
-
-  import context.dispatcher
-  override def preStart(): Unit = {
-    LOG.info(s"RegisterNewWorker")
-    totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
-    this.resource = Resource(totalSlots)
-    masterProxy ! RegisterNewWorker
-    context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
-  }
-
-  private def registerTimeoutTicker(seconds: Int): Cancellable = {
-    repeatActionUtil(seconds, () => Unit, () => {
-      LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
-        s"abort and kill the worker...")
-      self ! PoisonPill
-    })
-  }
-
-  private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
-    : Cancellable = {
-    val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
-      Duration(2, TimeUnit.SECONDS))(action())
-    val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
-    new Cancellable {
-      def cancel(): Boolean = {
-        val result1 = cancelTimeout.cancel()
-        val result2 = cancelSuicide.cancel()
-        result1 && result2
-      }
-
-      def isCancelled: Boolean = {
-        cancelTimeout.isCancelled && cancelSuicide.isCancelled
-      }
-    }
-  }
-
-  override def postStop(): Unit = {
-    LOG.info(s"Worker is going down....")
-    ioPool.shutdown()
-    context.system.terminate()
-  }
-}
-
-private[cluster] object Worker {
-
-  case class ExecutorResult(result: Try[Int])
-
-  class ExecutorWatcher(
-      launch: LaunchExecutor,
-      masterInfo: MasterInfo,
-      ioPool: ExecutionContext,
-      jarStoreService: JarStoreService,
-      procLauncher: ExecutorProcessLauncher) extends Actor {
-    import launch.{appId, executorId, resource}
-
-    private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
-
-    val executorConfig: Config = {
-      val workerConfig = context.system.settings.config
-
-      val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
-        Option(jvmConfig.executorAkkaConfig)
-      }.getOrElse(ConfigFactory.empty())
-
-      resolveExecutorConfig(workerConfig, submissionConfig)
-    }
-
-    // For some config, worker has priority, for others, user Application submission config
-    // have priorities.
-    private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
-      val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
-        .withoutPath(GEARPUMP_CLUSTER_MASTERS)
-        .withoutPath(GEARPUMP_HOME)
-        .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
-        .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
-        // Falls back to workerConfig
-        .withFallback(workerConfig)
-
-      // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
-      val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
-      val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
-        LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
-        config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
-      } else {
-        config
-      }
-
-      // Excludes reference.conf, and JVM properties..
-      ClusterConfig.filterOutDefaultConfig(updatedConf)
-    }
-
-    implicit val executorService = ioPool
-
-    private val executorHandler = {
-      val ctx = launch.executorJvmConfig
-
-      if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
-        new ExecutorHandler {
-          val exitPromise = Promise[Int]()
-          val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
-
-          override def destroy(): Unit = {
-            context.stop(app)
-          }
-          override def exitValue: Future[Int] = {
-            exitPromise.future
-          }
-        }
-      } else {
-        createProcess(ctx)
-      }
-    }
-
-    private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
-
-      val process = Future {
-        val jarPath = ctx.jar.map { appJar =>
-          val tempFile = File.createTempFile(appJar.name, ".jar")
-          jarStoreService.copyToLocalFile(tempFile, appJar.filePath)
-          val file = new URL("file:" + tempFile)
-          file.getFile
-        }
-
-        val configFile = {
-          val configFile = File.createTempFile("gearpump", ".conf")
-          ClusterConfig.saveConfig(executorConfig, configFile)
-          val file = new URL("file:" + configFile)
-          file.getFile
-        }
-
-        val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
-          ctx.classPath.map(path => expandEnviroment(path)) ++
-          jarPath.map(Array(_)).getOrElse(Array.empty[String])
-
-        val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
-        val logArgs = List(
-          s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
-          s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
-          s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
-          s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
-        val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
-
-        val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
-
-        // Remote debug executor process
-        val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
-        val remoteDebugConfig = if (remoteDebugFlag) {
-          val availablePort = Util.findFreePort().get
-          List(
-            "-Xdebug",
-            s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
-            s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
-          )
-        } else {
-          List.empty[String]
-        }
-
-        val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
-        val verboseGCConfig = if (verboseGCFlag) {
-          List(
-            s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
-            "-verbose:gc",
-            "-XX:+PrintGCDetails",
-            "-XX:+PrintGCDateStamps",
-            "-XX:+PrintTenuringDistribution",
-            "-XX:+PrintGCApplicationConcurrentTime",
-            "-XX:+PrintGCApplicationStoppedTime"
-          )
-        } else {
-          List.empty[String]
-        }
-
-        val ipv4 = List(s"-D${PREFER_IPV4}=true")
-
-        val options = ctx.jvmArguments ++ username ++
-          logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
-
-        LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}")
-        val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
-          options, classPath, ctx.mainClass, ctx.arguments)
-
-        ProcessInfo(process, jarPath, configFile)
-      }
-
-      new ExecutorHandler {
-
-        var destroyed = false
-
-        override def destroy(): Unit = {
-          LOG.info(s"Destroy executor process ${ctx.mainClass}")
-          if (!destroyed) {
-            destroyed = true
-            process.foreach { info =>
-              info.process.destroy()
-              info.jarPath.foreach(new File(_).delete())
-              new File(info.configFile).delete()
-            }
-          }
-        }
-
-        override def exitValue: Future[Int] = {
-          process.flatMap { info =>
-            val exit = info.process.exitValue()
-            if (exit == 0) {
-              Future.successful(0)
-            } else {
-              Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
-              s"error summary: ${info.process.logger.error}"))
-            }
-          }
-        }
-      }
-    }
-
-    private def expandEnviroment(path: String): String = {
-      // TODO: extend this to support more environment.
-      path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
-    }
-
-    override def preStart(): Unit = {
-      executorHandler.exitValue.onComplete { value =>
-        procLauncher.cleanProcess(appId, executorId)
-        val result = ExecutorResult(value)
-        self ! result
-      }
-    }
-
-    override def postStop(): Unit = {
-      executorHandler.destroy()
-    }
-
-    // The folders are under ${GEARPUMP_HOME}
-    val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" +
-      File.separator + "yarn")
-
-    override def receive: Receive = {
-      case ShutdownExecutor(appId, executorId, reason: String) =>
-        executorHandler.destroy()
-        sender ! ShutdownExecutorSucceed(appId, executorId)
-        context.stop(self)
-      case ExecutorResult(executorResult) =>
-        executorResult match {
-          case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
-          case Failure(e) => LOG.error("Executor exit with errors", e)
-        }
-        context.stop(self)
-    }
-
-    private def getFormatedTime(timestamp: Long): String = {
-      val datePattern = "yyyy-MM-dd-HH-mm"
-      val format = new java.text.SimpleDateFormat(datePattern)
-      format.format(timestamp)
-    }
-
-    private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
-      classPath.filterNot(matchDaemonPattern(_))
-    }
-
-    private def matchDaemonPattern(path: String): Boolean = {
-      daemonPathPattern.exists(path.contains(_))
-    }
-  }
-
-  trait ExecutorHandler {
-    def destroy(): Unit
-    def exitValue: Future[Int]
-  }
-
-  case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
-
-  /**
-   * Starts the executor in  the same JVM as worker.
-   */
-  class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
-    extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
-    private val exitCode = 0
-
-    override val supervisorStrategy =
-      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
-        case ex: Throwable =>
-          LOG.error(s"system $name stopped ", ex)
-          exit.failure(ex)
-          Stop
-      }
-
-    override def postStop(): Unit = {
-      if (!exit.isCompleted) {
-        exit.success(exitCode)
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala
deleted file mode 100644
index 305bdc1..0000000
--- a/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.jarstore.dfs
-
-import java.io.File
-
-import akka.actor.ActorSystem
-import com.typesafe.config.Config
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
-import org.slf4j.Logger
-
-import io.gearpump.jarstore.{FilePath, JarStoreService}
-import io.gearpump.util.{Constants, LogUtil}
-
-/**
- * DFSJarStoreService store the uploaded jar on HDFS
- */
-class DFSJarStoreService extends JarStoreService {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private var rootPath: Path = null
-
-  override val scheme: String = "hdfs"
-
-  override def init(config: Config, actorRefFactory: ActorSystem): Unit = {
-    rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH))
-    val fs = rootPath.getFileSystem(new Configuration())
-    if (!fs.exists(rootPath)) {
-      fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
-    }
-  }
-
-  /**
-   * This function will copy the remote file to local file system, called from client side.
-   *
-   * @param localFile The destination of file path
-   * @param remotePath The remote file path from JarStore
-   */
-  override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
-    LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from ${remotePath}")
-    val filePath = new Path(rootPath, remotePath.path)
-    val fs = filePath.getFileSystem(new Configuration())
-    val target = new Path(localFile.toURI().toString)
-    fs.copyToLocalFile(filePath, target)
-  }
-
-  /**
-   * This function will copy the local file to the remote JarStore, called from client side.
-   *
-   * @param localFile The local file
-   */
-  override def copyFromLocal(localFile: File): FilePath = {
-    val remotePath = FilePath(Math.abs(new java.util.Random().nextLong()).toString)
-    LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to ${remotePath}")
-    val filePath = new Path(rootPath, remotePath.path)
-    val fs = filePath.getFileSystem(new Configuration())
-    fs.copyFromLocalFile(new Path(localFile.toURI.toString), filePath)
-    remotePath
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala
deleted file mode 100644
index fa1a240..0000000
--- a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.jarstore.local
-
-import java.io.File
-
-import akka.actor.{Actor, Stash}
-import akka.pattern.pipe
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
-import io.gearpump.util._
-
-/**
- * LocalJarStore store the uploaded jar on local disk.
- */
-class LocalJarStore(rootDirPath: String) extends Actor with Stash {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
-  val rootDirectory = new File(rootDirPath)
-
-  FileUtils.forceMkdir(rootDirectory)
-
-  val server = new FileServer(context.system, host, 0, rootDirectory)
-
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-  implicit val executionContext = context.dispatcher
-
-  server.start pipeTo self
-
-  def receive: Receive = {
-    case FileServer.Port(port) =>
-      context.become(listen(port))
-      unstashAll()
-    case _ =>
-      stash()
-  }
-
-  def listen(port: Int): Receive = {
-    case GetJarStoreServer =>
-      sender ! JarStoreServerAddress(s"http://$host:$port/")
-  }
-
-  override def postStop(): Unit = {
-    server.stop
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala
deleted file mode 100644
index 969ce90..0000000
--- a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.jarstore.local
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.pattern.ask
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import io.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.jarstore.{FilePath, JarStoreService}
-import io.gearpump.util._
-
-/**
- * LocalJarStoreService store the uploaded jar on local disk.
- */
-class LocalJarStoreService extends JarStoreService {
-  private def LOG: Logger = LogUtil.getLogger(getClass)
-  private implicit val timeout = Constants.FUTURE_TIMEOUT
-  private var system: akka.actor.ActorSystem = null
-  private var master: ActorRef = null
-  private implicit def dispatcher: ExecutionContext = system.dispatcher
-
-  override val scheme: String = "file"
-
-  override def init(config: Config, system: ActorSystem): Unit = {
-    this.system = system
-    val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
-      .asScala.flatMap(Util.parseHostList)
-    master = system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}")
-  }
-
-  private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]]
-    .map { address =>
-      val client = new FileServer.Client(system, address.url)
-      client
-    }
-
-  /**
-   * This function will copy the remote file to local file system, called from client side.
-   *
-   * @param localFile The destination of file path
-   * @param remotePath The remote file path from JarStore
-   */
-  override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
-    LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath")
-    val future = client.flatMap(_.download(remotePath, localFile))
-    Await.ready(future, Duration(60, TimeUnit.SECONDS))
-  }
-
-  /**
-   * This function will copy the local file to the remote JarStore, called from client side.
-   * @param localFile The local file
-   */
-  override def copyFromLocal(localFile: File): FilePath = {
-    val future = client.flatMap(_.upload(localFile))
-    Await.result(future, Duration(60, TimeUnit.SECONDS))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
deleted file mode 100644
index 1824a22..0000000
--- a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.File
-import scala.concurrent.{ExecutionContext, Future}
-
-import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
-import akka.stream.Materializer
-import akka.stream.scaladsl.FileIO
-import akka.util.ByteString
-
-/**
- * FileDirective is a set of Akka-http directive to upload/download
- * huge binary files to/from Akka-Http server.
- */
-object FileDirective {
-
-  // Form field name
-  type Name = String
-
-  val CHUNK_SIZE = 262144
-
-  /**
-   * File information after a file is uploaded to server.
-   *
-   * @param originFileName original file name when user upload it in browser.
-   * @param file file name after the file is saved to server.
-   * @param length the length of the file
-   */
-  case class FileInfo(originFileName: String, file: File, length: Long)
-
-  class Form(val fields: Map[Name, FormField]) {
-    def getFile(fieldName: String): Option[FileInfo] = {
-      fields.get(fieldName).flatMap {
-        case Left(file) => Option(file)
-        case Right(_) => None
-      }
-    }
-
-    def getValue(fieldName: String): Option[String] = {
-      fields.get(fieldName).flatMap {
-        case Left(_) => None
-        case Right(value) => Option(value)
-      }
-    }
-  }
-
-  type FormField = Either[FileInfo, String]
-
-  /**
-   * directive to uploadFile, it store the uploaded files
-   * to temporary directory, and return a Map from form field name
-   * to FileInfo.
-   */
-  def uploadFile: Directive1[Form] = {
-    uploadFileTo(null)
-  }
-
-  /**
-   * Store the uploaded files to specific rootDirectory.
-   *
-   * @param rootDirectory directory to store the files.
-   * @return
-   */
-  def uploadFileTo(rootDirectory: File): Directive1[Form] = {
-    Directive[Tuple1[Form]] { inner =>
-      extractMaterializer {implicit mat =>
-        extractExecutionContext {implicit ec =>
-          uploadFileImpl(rootDirectory)(mat, ec) { filesFuture =>
-            ctx => {
-              filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
-            }
-          }
-        }
-      }
-    }
-  }
-
-  // Downloads file from server
-  def downloadFile(file: File): Route = {
-    val responseEntity = HttpEntity(
-      MediaTypes.`application/octet-stream`,
-      file.length,
-      FileIO.fromFile(file, CHUNK_SIZE))
-    complete(responseEntity)
-  }
-
-  private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext)
-    : Directive1[Future[Form]] = {
-    Directive[Tuple1[Future[Form]]] { inner =>
-      entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
-        val form = formdata.parts.mapAsync(1) { p =>
-          if (p.filename.isDefined) {
-
-            // Reserve the suffix
-            val targetPath = File.createTempFile(s"userfile_${p.name}_",
-              s"${p.filename.getOrElse("")}", rootDirectory)
-            val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
-            written.map(written =>
-              if (written.count > 0) {
-                Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count)))
-              } else {
-                Map.empty[Name, FormField]
-              })
-          } else {
-            val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) =>
-              total ++ input
-            }
-            valueFuture.map{value =>
-              Map(p.name -> Right(value.utf8String))
-            }
-          }
-        }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) =>
-          new Form(set.fields ++ value)
-        }
-
-        inner(Tuple1(form))
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/util/FileServer.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileServer.scala b/daemon/src/main/scala/io/gearpump/util/FileServer.scala
deleted file mode 100644
index bf389f7..0000000
--- a/daemon/src/main/scala/io/gearpump/util/FileServer.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.util
-
-import java.io.File
-import scala.concurrent.{ExecutionContext, Future}
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.ServerBinding
-import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model.Uri.{Path, Query}
-import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
-import akka.http.scaladsl.unmarshalling.Unmarshal
-import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.{FileIO, Sink, Source}
-import spray.json.DefaultJsonProtocol._
-import spray.json.JsonFormat
-
-import io.gearpump.jarstore.FilePath
-import io.gearpump.util.FileDirective._
-import io.gearpump.util.FileServer.Port
-
-/**
- * A simple file server implemented with akka-http to store/fetch large
- * binary files.
- */
-class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory: File) {
-  import system.dispatcher
-  implicit val actorSystem = system
-  implicit val materializer = ActorMaterializer()
-  implicit def ec: ExecutionContext = system.dispatcher
-
-  val route: Route = {
-    path("upload") {
-      uploadFileTo(rootDirectory) { form =>
-        val fileName = form.fields.headOption.flatMap { pair =>
-          val (_, fileInfo) = pair
-          fileInfo match {
-            case Left(file) => Option(file.file).map(_.getName)
-            case Right(_) => None
-          }
-        }
-
-        if (fileName.isDefined) {
-          complete(fileName.get)
-        } else {
-          failWith(new Exception("File not found in the uploaded form"))
-        }
-      }
-    } ~
-      path("download") {
-        parameters("file") { file: String =>
-          downloadFile(new File(rootDirectory, file))
-        }
-      } ~
-      pathEndOrSingleSlash {
-        extractUri { uri =>
-          val upload = uri.withPath(Uri.Path("/upload")).toString()
-          val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`,
-            s"""
-            |
-            |<h2>Please specify a file to upload:</h2>
-            |<form action="$upload" enctype="multipart/form-data" method="post">
-            |<input type="file" name="datafile" size="40">
-            |</p>
-            |<div>
-            |<input type="submit" value="Submit">
-            |</div>
-            |</form>
-          """.stripMargin)
-        complete(entity)
-      }
-    }
-  }
-
-  private var connection: Future[ServerBinding] = null
-
-  def start: Future[Port] = {
-    connection = Http().bindAndHandle(Route.handlerFlow(route), host, port)
-    connection.map(address => Port(address.localAddress.getPort))
-  }
-
-  def stop: Future[Unit] = {
-    connection.flatMap(_.unbind())
-  }
-}
-
-object FileServer {
-
-  implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply)
-
-  case class Port(port: Int)
-
-  /**
-   * Client of [[io.gearpump.util.FileServer]]
-   */
-  class Client(system: ActorSystem, host: String, port: Int) {
-
-    def this(system: ActorSystem, url: String) = {
-      this(system, Uri(url).authority.host.address(), Uri(url).authority.port)
-    }
-
-    private implicit val actorSystem = system
-    private implicit val materializer = ActorMaterializer()
-    private implicit val ec = system.dispatcher
-
-    val server = Uri(s"http://$host:$port")
-    val httpClient = Http(system).outgoingConnection(server.authority.host.address(),
-      server.authority.port)
-
-    def upload(file: File): Future[FilePath] = {
-      val target = server.withPath(Path("/upload"))
-
-      val request = entity(file).map { entity =>
-        HttpRequest(HttpMethods.POST, uri = target, entity = entity)
-      }
-
-      val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head)
-      response.flatMap { some =>
-        Unmarshal(some).to[String]
-      }.map { path =>
-        FilePath(path)
-      }
-    }
-
-    def download(remoteFile: FilePath, saveAs: File): Future[Unit] = {
-      val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path))
-      // Download file to local
-      val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head)
-      val downloaded = response.flatMap { response =>
-        response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
-      }
-      downloaded.map(written => Unit)
-    }
-
-    private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
-      val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
-        FileIO.fromFile(file, chunkSize = 100000))
-      val body = Source.single(
-        Multipart.FormData.BodyPart(
-          "uploadfile",
-          entity,
-          Map("filename" -> file.getName)))
-      val form = Multipart.FormData(body)
-
-      Marshal(form).to[RequestEntity]
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
new file mode 100644
index 0000000..9e55be6
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.ActorRef
+
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.WorkerId
+
+/**
+ * Cluster Bootup Flow
+ */
+object WorkerToMaster {
+
+  /** When an worker is started, it sends RegisterNewWorker */
+  case object RegisterNewWorker
+
+  /** When worker lose connection with master, it tries to register itself again with old Id. */
+  case class RegisterWorker(workerId: WorkerId)
+
+  /** Worker is responsible to broadcast its current status to master */
+  case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
+}
+
+object MasterToWorker {
+
+  /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
+  case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
+
+  /** Worker have not received reply from master */
+  case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
+
+  /** Master is synced with worker on resource slots managed by current worker */
+  case object UpdateResourceSucceed
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
new file mode 100644
index 0000000..9bde4d1
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.embedded
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import com.typesafe.config.{Config, ConfigValueFactory}
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.master.{Master => MasterActor}
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
+import org.apache.gearpump.util.{LogUtil, Util}
+
+/**
+ * Create a in-process cluster with single worker
+ */
+class EmbeddedCluster(inputConfig: Config) {
+
+  private val workerCount: Int = 1
+  private var _master: ActorRef = null
+  private var _system: ActorSystem = null
+  private var _config: Config = null
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  def start(): Unit = {
+    val port = Util.findFreePort().get
+    val akkaConf = getConfig(inputConfig, port)
+    _config = akkaConf
+    val system = ActorSystem(MASTER, akkaConf)
+
+    val master = system.actorOf(Props[MasterActor], MASTER)
+
+    0.until(workerCount).foreach { id =>
+      system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
+    }
+    this._master = master
+    this._system = system
+
+    LOG.info("=================================")
+    LOG.info("Local Cluster is started at: ")
+    LOG.info(s"                 127.0.0.1:$port")
+    LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+  }
+
+  private def getConfig(inputConfig: Config, port: Int): Config = {
+    val config = inputConfig.
+      withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
+      withValue(GEARPUMP_CLUSTER_MASTERS,
+        ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
+      withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
+        ConfigValueFactory.fromAnyRef(true)).
+      withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
+      withValue("akka.actor.provider",
+        ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
+    config
+  }
+
+  def newClientContext: ClientContext = {
+    ClientContext(_config, _system, _master)
+  }
+
+  def stop(): Unit = {
+    _system.stop(_master)
+    _system.terminate()
+    Await.result(_system.whenTerminated, Duration.Inf)
+  }
+}
+
+object EmbeddedCluster {
+  def apply(): EmbeddedCluster = {
+    new EmbeddedCluster(ClusterConfig.master())
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
new file mode 100644
index 0000000..98ec707
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import java.io.File
+import java.net.{URL, URLClassLoader}
+import java.util.jar.JarFile
+import scala.util.Try
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+
+/** Tool to submit an application jar to cluster */
+object AppSubmitter extends AkkaApp with ArgumentsParser {
+  val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val ignoreUnknownArgument = true
+
+  override val description = "Submit an application to Master by providing a jar"
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "namePrefix" -> CLIOption[String]("<application name prefix>", required = false,
+      defaultValue = Some("")),
+    "jar" -> CLIOption("<application>.jar", required = true),
+    "executors" -> CLIOption[Int]("number of executor to launch", required = false,
+      defaultValue = Some(1)),
+    "verbose" -> CLIOption("<print verbose log on console>", required = false,
+      defaultValue = Some(false)),
+    // For document purpose only, OPTION_CONFIG option is not used here.
+    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    val config = parse(args)
+    if (null != config) {
+
+      val verbose = config.getBoolean("verbose")
+      if (verbose) {
+        LogUtil.verboseLogToConsole()
+      }
+
+      val jar = config.getString("jar")
+
+      // Set jar path to be submitted to cluster
+      System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
+      System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
+
+      val namePrefix = config.getString("namePrefix")
+      if (namePrefix.nonEmpty) {
+        if (!Util.validApplicationName(namePrefix)) {
+          throw new Exception(s"$namePrefix is not a valid prefix for an application name")
+        }
+        System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
+      }
+
+      val jarFile = new java.io.File(jar)
+
+      // Start main class
+      if (!jarFile.exists()) {
+        throw new Exception(s"jar $jar does not exist")
+      }
+
+      val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" +
+        jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader())
+      val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader)
+
+      // Set to context classloader. ActorSystem pick context classloader in preference
+      Thread.currentThread().setContextClassLoader(classLoader)
+      val clazz = classLoader.loadClass(main)
+      val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+      mainMethod.invoke(null, arguments)
+    }
+  }
+
+  private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader)
+    : (String, Array[String]) = {
+    val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.
+      getValue("Main-Class")).getOrElse("")
+
+    if (remainArgs.length > 0 && Try(classLoader.loadClass(remainArgs(0))).isSuccess) {
+      (remainArgs(0), remainArgs.drop(1))
+    } else if (mainInManifest.nonEmpty) {
+      (mainInManifest, remainArgs)
+    } else {
+      throw new Exception("No main class specified")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
new file mode 100644
index 0000000..672fee6
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+object Gear {
+
+  val OPTION_CONFIG = "conf"
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  val commands = Map("app" -> AppSubmitter, "kill" -> Kill,
+    "info" -> Info, "replay" -> Replay, "main" -> MainRunner)
+
+  def usage(): Unit = {
+    val keys = commands.keys.toList.sorted
+    // scalastyle:off println
+    Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+    // scalastyle:on println
+  }
+
+  private def executeCommand(command: String, commandArgs: Array[String]) = {
+    commands.get(command).map(_.main(commandArgs))
+    if (!commands.contains(command)) {
+      val allArgs = (command +: commandArgs.toList).toArray
+      MainRunner.main(allArgs)
+    }
+  }
+
+  def main(inputArgs: Array[String]): Unit = {
+    val (configFile, args) = extractConfig(inputArgs)
+    if (configFile != null) {
+      // Sets custom config file...
+      System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile)
+    }
+
+    if (args.length == 0) {
+      usage()
+    } else {
+      val command = args(0)
+      val commandArgs = args.drop(1)
+      executeCommand(command, commandArgs)
+    }
+  }
+
+  private def extractConfig(inputArgs: Array[String]): (String, Array[String]) = {
+    var index = 0
+
+    var result = List.empty[String]
+    var configFile: String = null
+    while (index < inputArgs.length) {
+      val item = inputArgs(index)
+      if (item == s"-$OPTION_CONFIG") {
+        index += 1
+        configFile = inputArgs(index)
+      } else {
+        result = result :+ item
+      }
+      index += 1
+    }
+    (configFile, result.toArray)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
new file mode 100644
index 0000000..bf444a3
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Tool to query master info */
+object Info extends AkkaApp with ArgumentsParser {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    // For document purpose only, OPTION_CONFIG option is not used here.
+    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
+
+  override val description = "Query the Application list"
+
+  // scalastyle:off println
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val client = ClientContext(akkaConf)
+
+    val AppMastersData(appMasters) = client.listApps
+    Console.println("== Application Information ==")
+    Console.println("====================================")
+    appMasters.foreach { appData =>
+      Console.println(s"application: ${appData.appId}, name: ${appData.appName}, " +
+        s"status: ${appData.status}, worker: ${appData.workerPath}")
+    }
+    client.close()
+  }
+  // scalastyle:on println
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
new file mode 100644
index 0000000..17f6214
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Tool to kill an App */
+object Kill extends AkkaApp with ArgumentsParser {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "appid" -> CLIOption("<application id>", required = true),
+    // For document purpose only, OPTION_CONFIG option is not used here.
+    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
+
+  override val description = "Kill an application with application Id"
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+
+    if (null != config) {
+      val client = ClientContext(akkaConf)
+      LOG.info("Client ")
+      client.shutdown(config.getInt("appid"))
+      client.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
new file mode 100644
index 0000000..db71b7b
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorSystem, Props}
+import com.typesafe.config.ConfigValueFactory
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.{Master => MasterActor}
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
+
+object Local extends AkkaApp with ArgumentsParser {
+  override def akkaConfig: Config = ClusterConfig.master()
+
+  var LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] =
+    Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)),
+      "workernum" -> CLIOption[Int]("<how many workers to start>", required = false,
+        defaultValue = Some(2)))
+
+  override val description = "Start a local cluster"
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    this.LOG = {
+      LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL)
+      LogUtil.getLogger(getClass)
+    }
+
+    val config = parse(args)
+    if (null != config) {
+      local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
+    }
+  }
+
+  def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
+    if (sameProcess) {
+      LOG.info("Starting local in same process")
+      System.setProperty("LOCAL", "true")
+    }
+    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+      .asScala.flatMap(Util.parseHostList)
+    val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
+
+    if (masters.size != 1 && masters.head.host != local) {
+      LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
+        s"with ${Constants.GEARPUMP_HOSTNAME}")
+    } else {
+
+      val hostPort = masters.head
+      implicit val system = ActorSystem(MASTER, akkaConf.
+        withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port))
+      )
+
+      val master = system.actorOf(Props[MasterActor], MASTER)
+      val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER"
+
+      0.until(workerCount).foreach { id =>
+        system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
+      }
+
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
new file mode 100644
index 0000000..c6c9f10
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Tool to run any main class by providing a jar */
+object MainRunner extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    // For document purpose only, OPTION_CONFIG option is not used here.
+    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val mainClazz = args(0)
+    val commandArgs = args.drop(1)
+
+    val clazz = Thread.currentThread().getContextClassLoader().loadClass(mainClazz)
+    val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+    mainMethod.invoke(null, commandArgs)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
new file mode 100644
index 0000000..f1b9bdf
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor._
+import akka.cluster.ClusterEvent._
+import akka.cluster.ddata.DistributedData
+import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings}
+import akka.cluster.{Cluster, Member, MemberStatus}
+import com.typesafe.config.ConfigValueFactory
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode}
+import org.apache.gearpump.cluster.master.Master.MasterListUpdated
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
+
+object Master extends AkkaApp with ArgumentsParser {
+
+  private var LOG: Logger = LogUtil.getLogger(getClass)
+
+  override def akkaConfig: Config = ClusterConfig.master()
+
+  override val options: Array[(String, CLIOption[Any])] =
+    Array("ip" -> CLIOption[String]("<master ip address>", required = true),
+      "port" -> CLIOption("<master port>", required = true))
+
+  override val description = "Start Master daemon"
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    this.LOG = {
+      LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
+      LogUtil.getLogger(getClass)
+    }
+
+    val config = parse(args)
+    master(config.getString("ip"), config.getInt("port"), akkaConf)
+  }
+
+  private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = {
+    masters.exists { hostPort =>
+      hostPort == s"$master:$port"
+    }
+  }
+
+  private def master(ip: String, port: Int, akkaConf: Config): Unit = {
+    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+
+    if (!verifyMaster(ip, port, masters)) {
+      LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " +
+        s"gearpump.cluster.masters: ${masters.mkString(", ")}")
+      System.exit(-1)
+    }
+
+    val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava
+    val quorum = masterList.size() / 2 + 1
+    val masterConfig = akkaConf.
+      withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
+      withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
+      withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)).
+      withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
+        ConfigValueFactory.fromAnyRef(quorum))
+
+    LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
+    val system = ActorSystem(MASTER, masterConfig)
+
+    val replicator = DistributedData(system).replicator
+    LOG.info(s"Replicator path: ${replicator.path}")
+
+    // Starts singleton manager
+    val singletonManager = system.actorOf(ClusterSingletonManager.props(
+      singletonProps = Props(classOf[MasterWatcher], MASTER),
+      terminationMessage = PoisonPill,
+      settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
+        .withRole(MASTER)),
+      name = SINGLETON_MANAGER)
+
+    // Start master proxy
+    val masterProxy = system.actorOf(ClusterSingletonProxy.props(
+      singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
+      // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
+      // Master is created when there is a majority of machines started.
+      settings = ClusterSingletonProxySettings(system)
+        .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
+      name = MASTER
+    )
+
+    LOG.info(s"master proxy is started at ${masterProxy.path}")
+
+    val mainThread = Thread.currentThread()
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run(): Unit = {
+        if (!system.whenTerminated.isCompleted) {
+          LOG.info("Triggering shutdown hook....")
+
+          system.stop(masterProxy)
+          val cluster = Cluster(system)
+          cluster.leave(cluster.selfAddress)
+          cluster.down(cluster.selfAddress)
+          try {
+            Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
+          } catch {
+            case ex: Exception => // Ignore
+          }
+          system.terminate()
+          mainThread.join()
+        }
+      }
+    })
+
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}
+
+class MasterWatcher(role: String) extends Actor with ActorLogging {
+  import context.dispatcher
+
+  val cluster = Cluster(context.system)
+
+  val config = context.system.settings.config
+  val masters = config.getList("akka.cluster.seed-nodes")
+  val quorum = masters.size() / 2 + 1
+
+  val system = context.system
+
+  // Sorts by age, oldest first
+  val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) }
+  var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
+
+  def receive: Receive = null
+
+  // Subscribes to MemberEvent, re-subscribe when restart
+  override def preStart(): Unit = {
+    cluster.subscribe(self, classOf[MemberEvent])
+    context.become(waitForInit)
+  }
+  override def postStop(): Unit = {
+    cluster.unsubscribe(self)
+  }
+
+  def matchingRole(member: Member): Boolean = member.hasRole(role)
+
+  def waitForInit: Receive = {
+    case state: CurrentClusterState => {
+      membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m =>
+        m.status == MemberStatus.Up && matchingRole(m))
+
+      if (membersByAge.size < quorum) {
+        membersByAge.iterator.mkString(",")
+        log.info(s"We cannot get a quorum, $quorum, " +
+          s"shutting down...${membersByAge.iterator.mkString(",")}")
+        context.become(waitForShutdown)
+        self ! MasterWatcher.Shutdown
+      } else {
+        val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
+        notifyMasterMembersChange(master)
+        context.become(waitForClusterEvent(master))
+      }
+    }
+  }
+
+  def waitForClusterEvent(master: ActorRef): Receive = {
+    case MemberUp(m) if matchingRole(m) => {
+      membersByAge += m
+      notifyMasterMembersChange(master)
+    }
+    case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
+      mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
+      log.info(s"member removed ${mEvent.member}")
+      val m = mEvent.member
+      membersByAge -= m
+      if (membersByAge.size < quorum) {
+        log.info(s"We cannot get a quorum, $quorum, " +
+          s"shutting down...${membersByAge.iterator.mkString(",")}")
+        context.become(waitForShutdown)
+        self ! MasterWatcher.Shutdown
+      } else {
+        notifyMasterMembersChange(master)
+      }
+    }
+  }
+
+  private def notifyMasterMembersChange(master: ActorRef): Unit = {
+    val masters = membersByAge.toList.map{ member =>
+      MasterNode(member.address.host.getOrElse("Unknown-Host"),
+        member.address.port.getOrElse(0))
+    }
+    master ! MasterListUpdated(masters)
+  }
+
+  def waitForShutdown: Receive = {
+    case MasterWatcher.Shutdown => {
+      cluster.unsubscribe(self)
+      cluster.leave(cluster.selfAddress)
+      context.stop(self)
+      system.scheduler.scheduleOnce(Duration.Zero) {
+        try {
+          Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
+        } catch {
+          case ex: Exception => // Ignore
+        }
+        system.terminate()
+      }
+    }
+  }
+}
+
+object MasterWatcher {
+  object Shutdown
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
new file mode 100644
index 0000000..d721832
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+// Internal tool to restart an application
+object Replay extends AkkaApp with ArgumentsParser {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "appid" -> CLIOption("<application id>", required = true),
+    // For document purpose only, OPTION_CONFIG option is not used here.
+    // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
+
+  override val description = "Replay the application from current min clock(low watermark)"
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+
+    if (null != config) {
+      val client = ClientContext(akkaConf)
+      client.replayFromTimestampWindowTrailingEdge(config.getInt("appid"))
+      client.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
new file mode 100644
index 0000000..58a9dec
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorSystem, Props}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+
+/** Tool to start a worker daemon process */
+object Worker extends AkkaApp with ArgumentsParser {
+  protected override def akkaConfig = ClusterConfig.worker()
+
+  override val description = "Start a worker daemon"
+
+  var LOG: Logger = LogUtil.getLogger(getClass)
+
+  private def uuid = java.util.UUID.randomUUID.toString
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val id = uuid
+
+    this.LOG = {
+      LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
+      // Delay creation of LOG instance to avoid creating an empty log file as we
+      // reset the log file name here
+      LogUtil.getLogger(getClass)
+    }
+
+    val system = ActorSystem(id, akkaConf)
+
+    val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address =>
+      val hostAndPort = address.split(":")
+      HostPort(hostAndPort(0), hostAndPort(1).toInt)
+    }
+
+    LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...")
+    val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}")
+
+    system.actorOf(Props(classOf[WorkerActor], masterProxy),
+      classOf[WorkerActor].getSimpleName + id)
+
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}
\ No newline at end of file