You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/14 11:55:24 UTC

[3/4] incubator-gearpump git commit: [GEARPUMP-224] merge gearpump-daemon to gearpump-core

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
new file mode 100644
index 0000000..447b034
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+import java.lang.management.ManagementFactory
+import java.net.URL
+import java.util.concurrent.{Executors, TimeUnit}
+
+import akka.actor.SupervisorStrategy.Stop
+import akka.actor._
+import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceSucceed, UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster._
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher
+import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
+import org.apache.gearpump.jarstore.JarStoreClient
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.util.ActorSystemBooter.Daemon
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util.{TimeOutScheduler, _}
+import org.slf4j.Logger
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Worker is used to track the resource on single machine, it is like
+ * the node manager of YARN.
+ *
+ * @param masterProxy masterProxy is used to resolve the master
+ */
+private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
+  private val systemConfig: Config = context.system.settings.config
+
+  private val address = ActorUtil.getFullPath(context.system, self.path)
+  private var resource = Resource.empty
+  private var allocatedResources = Map[ActorRef, Resource]()
+  private var executorsInfo = Map[ActorRef, ExecutorSlots]()
+  private var id: WorkerId = WorkerId.unspecified
+  private val createdTime = System.currentTimeMillis()
+  private var masterInfo: MasterInfo = null
+  private var executorNameToActor = Map.empty[String, ActorRef]
+  private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
+  private val jarStoreClient = new JarStoreClient(systemConfig, context.system)
+
+  private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
+  private val resourceUpdateTimeoutMs = 30000 // Milliseconds
+
+  private var totalSlots: Int = 0
+
+  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+  var historyMetricsService: Option[ActorRef] = None
+
+  override def receive: Receive = null
+  var LOG: Logger = LogUtil.getLogger(getClass)
+
+  def service: Receive =
+    appMasterMsgHandler orElse
+      clientMessageHandler orElse
+      metricsService orElse
+      terminationWatch(masterInfo.master) orElse
+      ActorUtil.defaultMsgHandler(self)
+
+  def metricsService: Receive = {
+    case query: QueryHistoryMetrics =>
+      if (historyMetricsService.isEmpty) {
+        // Returns empty metrics so that we don't hang the UI
+        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+      } else {
+        historyMetricsService.get forward query
+      }
+  }
+
+  private var metricsInitialized = false
+
+  val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+
+  private def initializeMetrics(): Unit = {
+    // Registers jvm metrics
+    val metricsSetName = "worker" + WorkerId.render(id)
+    Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
+
+    historyMetricsService = if (metricsEnabled) {
+      val historyMetricsService = {
+        context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
+      }
+
+      val metricsReportService = context.actorOf(Props(
+        new MetricsReporterService(Metrics(context.system))))
+      historyMetricsService.tell(ReportMetrics, metricsReportService)
+      Some(historyMetricsService)
+    } else {
+      None
+    }
+  }
+
+  def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
+
+    // If master get disconnected, the WorkerRegistered may be triggered multiple times.
+    case WorkerRegistered(id, masterInfo) =>
+      this.id = id
+
+      // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
+      // itself.
+      if (!metricsInitialized) {
+        initializeMetrics()
+        metricsInitialized = true
+      }
+
+      this.masterInfo = masterInfo
+      timeoutTicker.cancel()
+      context.watch(masterInfo.master)
+      this.LOG = LogUtil.getLogger(getClass, worker = id)
+      LOG.info(s"Worker is registered. " +
+        s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
+      sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
+        resourceUpdateTimeoutMs, updateResourceTimeOut())
+      context.become(service)
+  }
+
+  private def updateResourceTimeOut(): Unit = {
+    LOG.error(s"Update worker resource time out")
+  }
+
+  def appMasterMsgHandler: Receive = {
+    case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
+      val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+      val executorToStop = executorNameToActor.get(actorName)
+      if (executorToStop.isDefined) {
+        LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
+          s"due to: $reason")
+        executorToStop.get.forward(shutdown)
+      } else {
+        LOG.error(s"Cannot find executor $actorName, ignore this message")
+        sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
+      }
+    case launch: LaunchExecutor =>
+      LOG.info(s"$launch")
+      if (resource < launch.resource) {
+        sender ! ExecutorLaunchRejected("There is no free resource on this machine")
+      } else {
+        val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
+
+        val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
+          jarStoreClient, executorProcLauncher))
+        executorNameToActor += actorName -> executor
+
+        resource = resource - launch.resource
+        allocatedResources = allocatedResources + (executor -> launch.resource)
+
+        reportResourceToMaster()
+        executorsInfo += executor ->
+          ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
+        context.watch(executor)
+      }
+    case UpdateResourceFailed(reason, ex) =>
+      LOG.error(reason)
+      context.stop(self)
+    case UpdateResourceSucceed =>
+      LOG.info(s"Update resource succeed")
+    case GetWorkerData(workerId) =>
+      val aliveFor = System.currentTimeMillis() - createdTime
+      val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+      val userDir = System.getProperty("user.dir")
+      sender ! WorkerData(WorkerSummary(
+        id, "active",
+        address,
+        aliveFor,
+        logDir,
+        executorsInfo.values.toArray,
+        totalSlots,
+        resource.slots,
+        userDir,
+        jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+        resourceManagerContainerId = systemConfig.getString(
+          GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
+        historyMetricsConfig = getHistoryMetricsConfig)
+      )
+    case ChangeExecutorResource(appId, executorId, usedResource) =>
+      for (executor <- executorActorRef(appId, executorId);
+        allocatedResource <- allocatedResources.get(executor)) {
+
+        allocatedResources += executor -> usedResource
+        resource = resource + allocatedResource - usedResource
+        reportResourceToMaster()
+
+        if (usedResource == Resource(0)) {
+          executorsInfo -= executor
+          allocatedResources -= executor
+          // stop executor if there is no resource binded to it.
+          LOG.info(s"Shutdown executor $executorId because the resource used is zero")
+          executor ! ShutdownExecutor(appId, executorId,
+            "Shutdown executor because the resource used is zero")
+        }
+      }
+  }
+
+  private def reportResourceToMaster(): Unit = {
+    sendMsgWithTimeOutCallBack(masterInfo.master,
+      ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
+  }
+
+  private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
+    val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+    executorNameToActor.get(actorName)
+  }
+
+  def clientMessageHandler: Receive = {
+    case QueryWorkerConfig(workerId) =>
+      if (this.id == workerId) {
+        sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+      } else {
+        sender ! WorkerConfig(ConfigFactory.empty)
+      }
+  }
+
+  private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
+    repeatActionUtil(
+      seconds = timeOutSeconds,
+      action = () => {
+        masterProxy ! RegisterWorker(workerId)
+      },
+      onTimeout = () => {
+        LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
+          s"seconds, abort and kill the worker...")
+        self ! PoisonPill
+      })
+  }
+
+  def terminationWatch(master: ActorRef): Receive = {
+    case Terminated(actor) =>
+      if (actor.compareTo(master) == 0) {
+        // Parent master is down, no point to keep worker anymore. Let's make suicide to free
+        // resources
+        LOG.info(s"Master cannot be contacted, find a new master ...")
+        context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
+      } else if (ActorUtil.isChildActorPath(self, actor)) {
+        // One executor is down,
+        LOG.info(s"Executor is down ${getExecutorName(actor)}")
+
+        val allocated = allocatedResources.get(actor)
+        if (allocated.isDefined) {
+          resource = resource + allocated.get
+          executorsInfo -= actor
+          allocatedResources = allocatedResources - actor
+          sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
+            resourceUpdateTimeoutMs, updateResourceTimeOut())
+        }
+      }
+  }
+
+  private def getExecutorName(actorRef: ActorRef): Option[String] = {
+    executorNameToActor.find(_._2 == actorRef).map(_._1)
+  }
+
+  private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
+    val launcherClazz = Class.forName(
+      systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
+    launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
+      .asInstanceOf[ExecutorProcessLauncher]
+  }
+
+  import context.dispatcher
+  override def preStart(): Unit = {
+    LOG.info(s"RegisterNewWorker")
+    totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
+    this.resource = Resource(totalSlots)
+    masterProxy ! RegisterNewWorker
+    context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
+  }
+
+  private def registerTimeoutTicker(seconds: Int): Cancellable = {
+    repeatActionUtil(seconds, () => Unit, () => {
+      LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
+        s"abort and kill the worker...")
+      self ! PoisonPill
+    })
+  }
+
+  private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
+    : Cancellable = {
+    val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
+      Duration(2, TimeUnit.SECONDS))(action())
+    val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
+    new Cancellable {
+      def cancel(): Boolean = {
+        val result1 = cancelTimeout.cancel()
+        val result2 = cancelSuicide.cancel()
+        result1 && result2
+      }
+
+      def isCancelled: Boolean = {
+        cancelTimeout.isCancelled && cancelSuicide.isCancelled
+      }
+    }
+  }
+
+  override def postStop(): Unit = {
+    LOG.info(s"Worker is going down....")
+    ioPool.shutdown()
+    context.system.terminate()
+  }
+}
+
+private[cluster] object Worker {
+
+  case class ExecutorResult(result: Try[Int])
+
+  class ExecutorWatcher(
+      launch: LaunchExecutor,
+      masterInfo: MasterInfo,
+      ioPool: ExecutionContext,
+      jarStoreClient: JarStoreClient,
+      procLauncher: ExecutorProcessLauncher) extends Actor {
+    import launch.{appId, executorId, resource}
+
+    private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
+
+    val executorConfig: Config = {
+      val workerConfig = context.system.settings.config
+
+      val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
+        Option(jvmConfig.executorAkkaConfig)
+      }.getOrElse(ConfigFactory.empty())
+
+      resolveExecutorConfig(workerConfig, submissionConfig)
+    }
+
+    // For some config, worker has priority, for others, user Application submission config
+    // have priorities.
+    private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
+      val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
+        .withoutPath(GEARPUMP_CLUSTER_MASTERS)
+        .withoutPath(GEARPUMP_HOME)
+        .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
+        .withoutPath(GEARPUMP_LOG_APPLICATION_DIR)
+        .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
+        // Falls back to workerConfig
+        .withFallback(workerConfig)
+
+      // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
+      val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
+      val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
+        LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
+        config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
+      } else {
+        config
+      }
+
+      // Excludes reference.conf, and JVM properties..
+      ClusterConfig.filterOutDefaultConfig(updatedConf)
+    }
+
+    implicit val executorService = ioPool
+
+    private val executorHandler = {
+      val ctx = launch.executorJvmConfig
+
+      if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
+        new ExecutorHandler {
+          val exitPromise = Promise[Int]()
+          val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
+
+          override def destroy(): Unit = {
+            context.stop(app)
+          }
+          override def exitValue: Future[Int] = {
+            exitPromise.future
+          }
+        }
+      } else {
+        createProcess(ctx)
+      }
+    }
+
+    private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
+
+      val process = Future {
+        val jarPath = ctx.jar.map { appJar =>
+          val tempFile = File.createTempFile(appJar.name, ".jar")
+          jarStoreClient.copyToLocalFile(tempFile, appJar.filePath)
+          val file = new URL("file:" + tempFile)
+          file.getFile
+        }
+
+        val configFile = {
+          val configFile = File.createTempFile("gearpump", ".conf")
+          ClusterConfig.saveConfig(executorConfig, configFile)
+          val file = new URL("file:" + configFile)
+          file.getFile
+        }
+
+        val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
+          ctx.classPath.map(path => expandEnviroment(path)) ++
+          jarPath.map(Array(_)).getOrElse(Array.empty[String])
+
+        val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
+        val logArgs = List(
+          s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
+          s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
+          s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
+          s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
+        val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
+
+        val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
+
+        // Remote debug executor process
+        val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
+        val remoteDebugConfig = if (remoteDebugFlag) {
+          val availablePort = Util.findFreePort().get
+          List(
+            "-Xdebug",
+            s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
+            s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
+          )
+        } else {
+          List.empty[String]
+        }
+
+        val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
+        val verboseGCConfig = if (verboseGCFlag) {
+          List(
+            s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
+            "-verbose:gc",
+            "-XX:+PrintGCDetails",
+            "-XX:+PrintGCDateStamps",
+            "-XX:+PrintTenuringDistribution",
+            "-XX:+PrintGCApplicationConcurrentTime",
+            "-XX:+PrintGCApplicationStoppedTime"
+          )
+        } else {
+          List.empty[String]
+        }
+
+        val ipv4 = List(s"-D${PREFER_IPV4}=true")
+
+        val options = ctx.jvmArguments ++ username ++
+          logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
+
+        val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
+          options, classPath, ctx.mainClass, ctx.arguments)
+
+        ProcessInfo(process, jarPath, configFile)
+      }
+
+      new ExecutorHandler {
+
+        var destroyed = false
+
+        override def destroy(): Unit = {
+          LOG.info(s"Destroy executor process ${ctx.mainClass}")
+          if (!destroyed) {
+            destroyed = true
+            process.foreach { info =>
+              info.process.destroy()
+              info.jarPath.foreach(new File(_).delete())
+              new File(info.configFile).delete()
+            }
+          }
+        }
+
+        override def exitValue: Future[Int] = {
+          process.flatMap { info =>
+            val exit = info.process.exitValue()
+            if (exit == 0) {
+              Future.successful(0)
+            } else {
+              Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
+              s"error summary: ${info.process.logger.error}"))
+            }
+          }
+        }
+      }
+    }
+
+    private def expandEnviroment(path: String): String = {
+      // TODO: extend this to support more environment.
+      path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
+    }
+
+    override def preStart(): Unit = {
+      executorHandler.exitValue.onComplete { value =>
+        procLauncher.cleanProcess(appId, executorId)
+        val result = ExecutorResult(value)
+        self ! result
+      }
+    }
+
+    override def postStop(): Unit = {
+      executorHandler.destroy()
+    }
+
+    // The folders are under ${GEARPUMP_HOME}
+    val daemonPathPattern = List("lib" + File.separator + "yarn")
+
+    override def receive: Receive = {
+      case ShutdownExecutor(appId, executorId, reason: String) =>
+        executorHandler.destroy()
+        sender ! ShutdownExecutorSucceed(appId, executorId)
+        context.stop(self)
+      case ExecutorResult(executorResult) =>
+        executorResult match {
+          case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
+          case Failure(e) => LOG.error("Executor exit with errors", e)
+        }
+        context.stop(self)
+    }
+
+    private def getFormatedTime(timestamp: Long): String = {
+      val datePattern = "yyyy-MM-dd-HH-mm"
+      val format = new java.text.SimpleDateFormat(datePattern)
+      format.format(timestamp)
+    }
+
+    private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
+      classPath.filterNot(matchDaemonPattern(_))
+    }
+
+    private def matchDaemonPattern(path: String): Boolean = {
+      daemonPathPattern.exists(path.contains(_))
+    }
+  }
+
+  trait ExecutorHandler {
+    def destroy(): Unit
+    def exitValue: Future[Int]
+  }
+
+  case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
+
+  /**
+   * Starts the executor in  the same JVM as worker.
+   */
+  class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
+    extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
+    private val exitCode = 0
+
+    override val supervisorStrategy =
+      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
+        case ex: Throwable =>
+          LOG.error(s"system $name stopped ", ex)
+          exit.failure(ex)
+          Stop
+      }
+
+    override def postStop(): Unit = {
+      if (!exit.isCompleted) {
+        exit.success(exitCode)
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
new file mode 100644
index 0000000..0a22245
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.pattern.ask
+import akka.testkit.TestActorRef
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
+import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
+import org.apache.gearpump.cluster.master.Master
+import org.apache.gearpump.cluster.worker.Worker
+import org.apache.gearpump.util.Constants
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+class MiniCluster {
+  private val mockMasterIP = "127.0.0.1"
+
+  implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
+    withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP)))
+
+  val (mockMaster, worker) = {
+    val master = system.actorOf(Props(classOf[Master]), "master")
+    val worker = system.actorOf(Props(classOf[Worker], master), "worker")
+
+    // Wait until worker register itself to master
+    waitUtilWorkerIsRegistered(master)
+    (master, worker)
+  }
+
+  def launchActor(props: Props): TestActorRef[Actor] = {
+    TestActorRef(props)
+  }
+
+  private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
+    while (!isWorkerRegistered(master)) {}
+  }
+
+  private def isWorkerRegistered(master: ActorRef): Boolean = {
+    import scala.concurrent.duration._
+    implicit val dispatcher = system.dispatcher
+
+    implicit val futureTimeout = Constants.FUTURE_TIMEOUT
+
+    val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]]
+
+    // Waits until the worker is registered.
+    val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
+    workers.workers.size > 0
+  }
+
+  def shutDown(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
new file mode 100644
index 0000000..f9b0762
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
+import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, AppManager}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess}
+import org.apache.gearpump.cluster.{TestUtil, _}
+import org.apache.gearpump.util.LogUtil
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.util.Success
+
+class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+  var kvService: TestProbe = null
+  var haService: TestProbe = null
+  var appLauncher: TestProbe = null
+  var appManager: ActorRef = null
+  private val LOG = LogUtil.getLogger(getClass)
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+    kvService = TestProbe()(getActorSystem)
+    appLauncher = TestProbe()(getActorSystem)
+
+    appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
+      new DummyAppMasterLauncherFactory(appLauncher))))
+    kvService.expectMsgType[GetKV]
+    kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty)))
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "AppManager" should "handle AppMaster message correctly" in {
+    val appMaster = TestProbe()(getActorSystem)
+    val appId = 1
+
+    val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName"))
+    appMaster.send(appManager, register)
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    appMaster.send(appManager, ActivateAppMaster(appId))
+    appMaster.expectMsgType[AppMasterActivated]
+  }
+
+  "DataStoreService" should "support Put and Get" in {
+    val appMaster = TestProbe()(getActorSystem)
+    appMaster.send(appManager, SaveAppData(0, "key", 1))
+    kvService.expectMsgType[PutKV]
+    kvService.reply(PutKVSuccess)
+    appMaster.expectMsg(AppDataSaved)
+
+    appMaster.send(appManager, GetAppData(0, "key"))
+    kvService.expectMsgType[GetKV]
+    kvService.reply(GetKVSuccess("key", 1))
+    appMaster.expectMsg(GetAppDataResult("key", 1))
+  }
+
+  "AppManager" should "support application submission and shutdown" in {
+    testClientSubmission(withRecover = false)
+  }
+
+  "AppManager" should "support application submission and recover if appmaster dies" in {
+    LOG.info("=================testing recover==============")
+    testClientSubmission(withRecover = true)
+  }
+
+  "AppManager" should "handle client message correctly" in {
+    val mockClient = TestProbe()(getActorSystem)
+    mockClient.send(appManager, ShutdownApplication(1))
+    assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
+
+    mockClient.send(appManager, ResolveAppId(1))
+    assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
+
+    mockClient.send(appManager, AppMasterDataRequest(1))
+    mockClient.expectMsg(AppMasterData(AppMasterNonExist))
+  }
+
+  "AppManager" should "reject the application submission if the app name already existed" in {
+    val app = TestUtil.dummyApp
+    val submit = SubmitApplication(app, None, "username")
+    val client = TestProbe()(getActorSystem)
+    val appMaster = TestProbe()(getActorSystem)
+    val worker = TestProbe()(getActorSystem)
+    val appId = 1
+
+    client.send(appManager, submit)
+
+    kvService.expectMsgType[PutKV]
+    appLauncher.expectMsg(LauncherStarted(appId))
+    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+      AppMasterRuntimeInfo(appId, app.name)))
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    client.send(appManager, submit)
+    assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
+  }
+
+  def testClientSubmission(withRecover: Boolean): Unit = {
+    val app = TestUtil.dummyApp
+    val submit = SubmitApplication(app, None, "username")
+    val client = TestProbe()(getActorSystem)
+    val appMaster = TestProbe()(getActorSystem)
+    val worker = TestProbe()(getActorSystem)
+    val appId = 1
+
+    client.send(appManager, submit)
+
+    kvService.expectMsgType[PutKV]
+    appLauncher.expectMsg(LauncherStarted(appId))
+    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+      AppMasterRuntimeInfo(appId, app.name)))
+    kvService.expectMsgType[PutKV]
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    client.send(appManager, ResolveAppId(appId))
+    client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
+
+    client.send(appManager, AppMastersDataRequest)
+    client.expectMsgType[AppMastersData]
+
+    client.send(appManager, AppMasterDataRequest(appId, false))
+    client.expectMsgType[AppMasterData]
+
+    if (!withRecover) {
+      client.send(appManager, ShutdownApplication(appId))
+      client.expectMsg(ShutdownApplicationResult(Success(appId)))
+    } else {
+      // Do recovery
+      getActorSystem.stop(appMaster.ref)
+      kvService.expectMsgType[GetKV]
+      val appState = ApplicationState(appId, "application1", 1, app, None, "username", null)
+      kvService.reply(GetKVSuccess(APP_STATE, appState))
+      appLauncher.expectMsg(LauncherStarted(appId))
+    }
+  }
+}
+
+class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory {
+  override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+      username: String, master: ActorRef, client: Option[ActorRef]): Props = {
+    Props(new DummyAppMasterLauncher(test, appId))
+  }
+}
+
+class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
+  test.ref ! LauncherStarted(appId)
+  
+  override def receive: Receive = {
+    case any: Any => test.ref forward any
+  }
+}
+
+case class LauncherStarted(appId: Int)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
new file mode 100644
index 0000000..d3e739f
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.Props
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.master.InMemoryKVService
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+class InMemoryKVServiceSpec
+  extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  override def config: Config = TestUtil.MASTER_CONFIG
+
+  "KVService" should "get, put, delete correctly" in {
+    val system = getActorSystem
+    val kvService = system.actorOf(Props(new InMemoryKVService()))
+    val group = "group"
+
+    val client = TestProbe()(system)
+
+    client.send(kvService, PutKV(group, "key", 1))
+    client.expectMsg(PutKVSuccess)
+
+    client.send(kvService, PutKV(group, "key", 2))
+    client.expectMsg(PutKVSuccess)
+
+    client.send(kvService, GetKV(group, "key"))
+    client.expectMsg(GetKVSuccess("key", 2))
+
+    client.send(kvService, DeleteKVGroup(group))
+
+    // After DeleteGroup, it no longer accept Get and Put message for this group.
+    client.send(kvService, GetKV(group, "key"))
+    client.expectNoMsg(3.seconds)
+
+    client.send(kvService, PutKV(group, "key", 3))
+    client.expectNoMsg(3.seconds)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
new file mode 100644
index 0000000..2166976
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.Properties
+
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _}
+import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
+import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{Constants, LogUtil, Util}
+import org.scalatest._
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "Worker" should "register worker address to master when started." in {
+
+    val masterReceiver = createMockMaster()
+
+    val tempTestConf = convertTestConf(getHost, getPort)
+
+    val options = Array(
+      s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
+      s"-D${PREFER_IPV4}=true"
+    ) ++ getMasterListOption()
+
+    val worker = Util.startProcess(options,
+      getContextClassPath,
+      getMainClassName(Worker),
+      Array.empty)
+
+    try {
+      masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
+
+      tempTestConf.delete()
+    } finally {
+      worker.destroy()
+    }
+  }
+
+  "Master" should "accept worker RegisterNewWorker when started" in {
+    val worker = TestProbe()(getActorSystem)
+
+    val host = "127.0.0.1"
+    val port = Util.findFreePort().get
+
+    val properties = new Properties()
+    properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port")
+    properties.put(s"${GEARPUMP_HOSTNAME}", s"$host")
+    val masterConfig = ConfigFactory.parseProperties(properties)
+      .withFallback(TestUtil.MASTER_CONFIG)
+    Future {
+      Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString))
+    }
+
+    val masterProxy = getActorSystem.actorOf(
+      MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
+
+    worker.send(masterProxy, RegisterNewWorker)
+    worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
+  }
+
+  "Info" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
+    }
+
+    masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
+    masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName"))))
+  }
+
+  "Kill" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      Kill.main(masterConfig, Array("-appid", "0"))
+    }
+
+    masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
+    masterReceiver.reply(ShutdownApplicationResult(Success(0)))
+  }
+
+  "Replay" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      Replay.main(masterConfig, Array("-appid", "0"))
+    }
+
+    masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
+    masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
+    masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
+    masterReceiver.reply(ReplayApplicationResult(Success(0)))
+  }
+
+  "Local" should "be started without exception" in {
+    val port = Util.findFreePort().get
+    val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
+      s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
+      s"-D${PREFER_IPV4}=true")
+
+    val local = Util.startProcess(options,
+      getContextClassPath,
+      getMainClassName(Local),
+      Array.empty)
+
+    def retry(times: Int)(fn: => Boolean): Boolean = {
+
+      LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..")
+
+      val result = fn
+      if (result || times <= 0) {
+        result
+      } else {
+        Thread.sleep(1000)
+        retry(times - 1)(fn)
+      }
+    }
+
+    try {
+      assert(retry(10)(isPortUsed("127.0.0.1", port)),
+        "local is not started successfully, as port is not used " + port)
+    } finally {
+      local.destroy()
+    }
+  }
+
+  "Gear" should "support app|info|kill|shell|replay" in {
+
+    val commands = Array("app", "info", "kill", "shell", "replay")
+
+    assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
+
+    for (command <- commands) {
+      assert(Try(Gear.main(Array("-noexist"))).isFailure,
+        "pass unknown option, throw, command: " + command)
+    }
+
+    assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ")
+
+    val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
+    assert(tryThis.isFailure, "unknown command, throw")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
new file mode 100644
index 0000000..b48fc2a
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.TestUtil
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class MasterWatcherSpec extends FlatSpec with Matchers {
+  def config: Config = TestUtil.MASTER_CONFIG
+
+  "MasterWatcher" should "kill itself when can not get a quorum" in {
+    val system = ActorSystem("ForMasterWatcher", config)
+
+    val actorWatcher = TestProbe()(system)
+
+    val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
+    actorWatcher watch masterWatcher
+    actorWatcher.expectTerminated(masterWatcher, 5.seconds)
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
new file mode 100644
index 0000000..8a3d7d1
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.duration._
+
+class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
+  with WordSpecLike with Matchers with BeforeAndAfterAll{
+
+  def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
+  val appId = 0
+  val workerId1: WorkerId = WorkerId(1, 0L)
+  val workerId2: WorkerId = WorkerId(2, 0L)
+  val mockAppMaster = TestProbe()
+  val mockWorker1 = TestProbe()
+  val mockWorker2 = TestProbe()
+
+  override def afterAll {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The scheduler" should {
+    "update resource only when the worker is registered" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
+      expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " +
+        s"registered into master"))
+    }
+
+    "drop application's resource requests when the application is removed" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+      mockAppMaster.expectNoMsg(5.seconds)
+    }
+  }
+
+  def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = {
+    left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
+  }
+
+  "The resource request with higher priority" should {
+    "be handled first" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY)
+      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY)
+
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The resource request which delivered earlier" should {
+    "be handled first if the priorities are the same" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The PriorityScheduler" should {
+    "handle the resource request with different relaxation" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER)
+      val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER)
+
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      val request3 = ResourceRequest(
+        Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+
+      expect = ResourceAllocated(Array(
+        ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
+        ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      // We have to manually update the resource on each worker
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
+      val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER)
+      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The PriorityScheduler" should {
+    "handle the resource request with different executor number" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+      // By default, the request requires only one executor
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations2.allocations.length == 1)
+      assert(allocations2.allocations.head.resource == Resource(20))
+
+      val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+      val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations3.allocations.length == 3)
+      assert(allocations3.allocations.forall(_.resource == Resource(8)))
+
+      // The total available resource can not satisfy the requirements with executor number
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
+      val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
+      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+      val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations4.allocations.length == 2)
+      assert(allocations4.allocations.forall(_.resource == Resource(20)))
+
+      // When new resources are available, the remaining request will be satisfied
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
+      val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations5.allocations.length == 1)
+      assert(allocations4.allocations.forall(_.resource == Resource(20)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
new file mode 100644
index 0000000..e0233f8
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import akka.actor.{ActorSystem, PoisonPill, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed}
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
+import org.scalatest._
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  val appId = 1
+  val workerId: WorkerId = WorkerId(1, 0L)
+  val executorId = 1
+  var masterProxy: TestProbe = null
+  var mockMaster: TestProbe = null
+  var client: TestProbe = null
+  val workerSlots = 50
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+    mockMaster = TestProbe()(getActorSystem)
+    masterProxy = TestProbe()(getActorSystem)
+    client = TestProbe()(getActorSystem)
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "The new started worker" should {
+    "kill itself if no response from Master after registering" in {
+      val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+      mockMaster watch worker
+      mockMaster.expectMsg(RegisterNewWorker)
+      mockMaster.expectTerminated(worker, 60.seconds)
+    }
+  }
+
+  "Worker" should {
+    "init its resource from the gearpump config" in {
+      val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
+        withFallback(TestUtil.DEFAULT_CONFIG)
+      val workerSystem = ActorSystem("WorkerSystem", config)
+      val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+      mockMaster watch worker
+      mockMaster.expectMsg(RegisterNewWorker)
+
+      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots)))
+
+      worker.tell(
+        UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
+      mockMaster.expectTerminated(worker, 5.seconds)
+      workerSystem.terminate()
+      Await.result(workerSystem.whenTerminated, Duration.Inf)
+    }
+  }
+
+  "Worker" should {
+    "update its remaining resource when launching and shutting down executors" in {
+      val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref))
+      masterProxy.expectMsg(RegisterNewWorker)
+
+      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+
+      val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
+      // This is an actor path which the ActorSystemBooter will report back to,
+      // not needed in this test
+      val reportBack = "dummy"
+      val executionContext = ExecutorJVMConfig(Array.empty[String],
+        getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "),
+        classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None,
+        username = "user")
+
+      // Test LaunchExecutor
+      worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext),
+        mockMaster.ref)
+      mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine"))
+
+      worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
+
+      worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
+
+      // Test terminationWatch
+      worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+      client.expectMsg(ShutdownExecutorSucceed(1, 1))
+
+      worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref)
+      client.expectMsg(ShutdownExecutorFailed(
+        s"Can not find executor ${executorId + 1} for app $appId"))
+
+      mockMaster.ref ! PoisonPill
+      masterProxy.expectMsg(RegisterWorker(workerId))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
deleted file mode 100644
index 9e55be6..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster
-
-import akka.actor.ActorRef
-
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.worker.WorkerId
-
-/**
- * Cluster Bootup Flow
- */
-object WorkerToMaster {
-
-  /** When an worker is started, it sends RegisterNewWorker */
-  case object RegisterNewWorker
-
-  /** When worker lose connection with master, it tries to register itself again with old Id. */
-  case class RegisterWorker(workerId: WorkerId)
-
-  /** Worker is responsible to broadcast its current status to master */
-  case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
-}
-
-object MasterToWorker {
-
-  /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
-  case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
-
-  /** Worker have not received reply from master */
-  case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
-
-  /** Master is synced with worker on resource slots managed by current worker */
-  case object UpdateResourceSucceed
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
deleted file mode 100644
index 9bde4d1..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.embedded
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import com.typesafe.config.{Config, ConfigValueFactory}
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.master.{Master => MasterActor}
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
-import org.apache.gearpump.util.{LogUtil, Util}
-
-/**
- * Create a in-process cluster with single worker
- */
-class EmbeddedCluster(inputConfig: Config) {
-
-  private val workerCount: Int = 1
-  private var _master: ActorRef = null
-  private var _system: ActorSystem = null
-  private var _config: Config = null
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  def start(): Unit = {
-    val port = Util.findFreePort().get
-    val akkaConf = getConfig(inputConfig, port)
-    _config = akkaConf
-    val system = ActorSystem(MASTER, akkaConf)
-
-    val master = system.actorOf(Props[MasterActor], MASTER)
-
-    0.until(workerCount).foreach { id =>
-      system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
-    }
-    this._master = master
-    this._system = system
-
-    LOG.info("=================================")
-    LOG.info("Local Cluster is started at: ")
-    LOG.info(s"                 127.0.0.1:$port")
-    LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
-  }
-
-  private def getConfig(inputConfig: Config, port: Int): Config = {
-    val config = inputConfig.
-      withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
-      withValue(GEARPUMP_CLUSTER_MASTERS,
-        ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
-      withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
-        ConfigValueFactory.fromAnyRef(true)).
-      withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
-      withValue("akka.actor.provider",
-        ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
-    config
-  }
-
-  def newClientContext: ClientContext = {
-    ClientContext(_config, _system, _master)
-  }
-
-  def stop(): Unit = {
-    _system.stop(_master)
-    _system.terminate()
-    Await.result(_system.whenTerminated, Duration.Inf)
-  }
-}
-
-object EmbeddedCluster {
-  def apply(): EmbeddedCluster = {
-    new EmbeddedCluster(ClusterConfig.master())
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
deleted file mode 100644
index db71b7b..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.{Master => MasterActor}
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
-
-object Local extends AkkaApp with ArgumentsParser {
-  override def akkaConfig: Config = ClusterConfig.master()
-
-  var LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] =
-    Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)),
-      "workernum" -> CLIOption[Int]("<how many workers to start>", required = false,
-        defaultValue = Some(2)))
-
-  override val description = "Start a local cluster"
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-
-    this.LOG = {
-      LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL)
-      LogUtil.getLogger(getClass)
-    }
-
-    val config = parse(args)
-    if (null != config) {
-      local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
-    }
-  }
-
-  def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
-    if (sameProcess) {
-      LOG.info("Starting local in same process")
-      System.setProperty("LOCAL", "true")
-    }
-    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
-      .asScala.flatMap(Util.parseHostList)
-    val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
-
-    if (masters.size != 1 && masters.head.host != local) {
-      LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
-        s"with ${Constants.GEARPUMP_HOSTNAME}")
-    } else {
-
-      val hostPort = masters.head
-      implicit val system = ActorSystem(MASTER, akkaConf.
-        withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port))
-      )
-
-      val master = system.actorOf(Props[MasterActor], MASTER)
-      val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER"
-
-      0.until(workerCount).foreach { id =>
-        system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
-      }
-
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
deleted file mode 100644
index f1b9bdf..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor._
-import akka.cluster.ClusterEvent._
-import akka.cluster.ddata.DistributedData
-import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings}
-import akka.cluster.{Cluster, Member, MemberStatus}
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode}
-import org.apache.gearpump.cluster.master.Master.MasterListUpdated
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
-
-object Master extends AkkaApp with ArgumentsParser {
-
-  private var LOG: Logger = LogUtil.getLogger(getClass)
-
-  override def akkaConfig: Config = ClusterConfig.master()
-
-  override val options: Array[(String, CLIOption[Any])] =
-    Array("ip" -> CLIOption[String]("<master ip address>", required = true),
-      "port" -> CLIOption("<master port>", required = true))
-
-  override val description = "Start Master daemon"
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-
-    this.LOG = {
-      LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
-      LogUtil.getLogger(getClass)
-    }
-
-    val config = parse(args)
-    master(config.getString("ip"), config.getInt("port"), akkaConf)
-  }
-
-  private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = {
-    masters.exists { hostPort =>
-      hostPort == s"$master:$port"
-    }
-  }
-
-  private def master(ip: String, port: Int, akkaConf: Config): Unit = {
-    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
-
-    if (!verifyMaster(ip, port, masters)) {
-      LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " +
-        s"gearpump.cluster.masters: ${masters.mkString(", ")}")
-      System.exit(-1)
-    }
-
-    val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava
-    val quorum = masterList.size() / 2 + 1
-    val masterConfig = akkaConf.
-      withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
-      withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
-      withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)).
-      withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
-        ConfigValueFactory.fromAnyRef(quorum))
-
-    LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
-    val system = ActorSystem(MASTER, masterConfig)
-
-    val replicator = DistributedData(system).replicator
-    LOG.info(s"Replicator path: ${replicator.path}")
-
-    // Starts singleton manager
-    val singletonManager = system.actorOf(ClusterSingletonManager.props(
-      singletonProps = Props(classOf[MasterWatcher], MASTER),
-      terminationMessage = PoisonPill,
-      settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
-        .withRole(MASTER)),
-      name = SINGLETON_MANAGER)
-
-    // Start master proxy
-    val masterProxy = system.actorOf(ClusterSingletonProxy.props(
-      singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
-      // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
-      // Master is created when there is a majority of machines started.
-      settings = ClusterSingletonProxySettings(system)
-        .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
-      name = MASTER
-    )
-
-    LOG.info(s"master proxy is started at ${masterProxy.path}")
-
-    val mainThread = Thread.currentThread()
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run(): Unit = {
-        if (!system.whenTerminated.isCompleted) {
-          LOG.info("Triggering shutdown hook....")
-
-          system.stop(masterProxy)
-          val cluster = Cluster(system)
-          cluster.leave(cluster.selfAddress)
-          cluster.down(cluster.selfAddress)
-          try {
-            Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
-          } catch {
-            case ex: Exception => // Ignore
-          }
-          system.terminate()
-          mainThread.join()
-        }
-      }
-    })
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
-
-class MasterWatcher(role: String) extends Actor with ActorLogging {
-  import context.dispatcher
-
-  val cluster = Cluster(context.system)
-
-  val config = context.system.settings.config
-  val masters = config.getList("akka.cluster.seed-nodes")
-  val quorum = masters.size() / 2 + 1
-
-  val system = context.system
-
-  // Sorts by age, oldest first
-  val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) }
-  var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
-
-  def receive: Receive = null
-
-  // Subscribes to MemberEvent, re-subscribe when restart
-  override def preStart(): Unit = {
-    cluster.subscribe(self, classOf[MemberEvent])
-    context.become(waitForInit)
-  }
-  override def postStop(): Unit = {
-    cluster.unsubscribe(self)
-  }
-
-  def matchingRole(member: Member): Boolean = member.hasRole(role)
-
-  def waitForInit: Receive = {
-    case state: CurrentClusterState => {
-      membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m =>
-        m.status == MemberStatus.Up && matchingRole(m))
-
-      if (membersByAge.size < quorum) {
-        membersByAge.iterator.mkString(",")
-        log.info(s"We cannot get a quorum, $quorum, " +
-          s"shutting down...${membersByAge.iterator.mkString(",")}")
-        context.become(waitForShutdown)
-        self ! MasterWatcher.Shutdown
-      } else {
-        val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
-        notifyMasterMembersChange(master)
-        context.become(waitForClusterEvent(master))
-      }
-    }
-  }
-
-  def waitForClusterEvent(master: ActorRef): Receive = {
-    case MemberUp(m) if matchingRole(m) => {
-      membersByAge += m
-      notifyMasterMembersChange(master)
-    }
-    case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
-      mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
-      log.info(s"member removed ${mEvent.member}")
-      val m = mEvent.member
-      membersByAge -= m
-      if (membersByAge.size < quorum) {
-        log.info(s"We cannot get a quorum, $quorum, " +
-          s"shutting down...${membersByAge.iterator.mkString(",")}")
-        context.become(waitForShutdown)
-        self ! MasterWatcher.Shutdown
-      } else {
-        notifyMasterMembersChange(master)
-      }
-    }
-  }
-
-  private def notifyMasterMembersChange(master: ActorRef): Unit = {
-    val masters = membersByAge.toList.map{ member =>
-      MasterNode(member.address.host.getOrElse("Unknown-Host"),
-        member.address.port.getOrElse(0))
-    }
-    master ! MasterListUpdated(masters)
-  }
-
-  def waitForShutdown: Receive = {
-    case MasterWatcher.Shutdown => {
-      cluster.unsubscribe(self)
-      cluster.leave(cluster.selfAddress)
-      context.stop(self)
-      system.scheduler.scheduleOnce(Duration.Zero) {
-        try {
-          Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
-        } catch {
-          case ex: Exception => // Ignore
-        }
-        system.terminate()
-      }
-    }
-  }
-}
-
-object MasterWatcher {
-  object Shutdown
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
deleted file mode 100644
index 58a9dec..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to start a worker daemon process */
-object Worker extends AkkaApp with ArgumentsParser {
-  protected override def akkaConfig = ClusterConfig.worker()
-
-  override val description = "Start a worker daemon"
-
-  var LOG: Logger = LogUtil.getLogger(getClass)
-
-  private def uuid = java.util.UUID.randomUUID.toString
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val id = uuid
-
-    this.LOG = {
-      LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
-      // Delay creation of LOG instance to avoid creating an empty log file as we
-      // reset the log file name here
-      LogUtil.getLogger(getClass)
-    }
-
-    val system = ActorSystem(id, akkaConf)
-
-    val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address =>
-      val hostAndPort = address.split(":")
-      HostPort(hostAndPort(0), hostAndPort(1).toInt)
-    }
-
-    LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...")
-    val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}")
-
-    system.actorOf(Props(classOf[WorkerActor], masterProxy),
-      classOf[WorkerActor].getSimpleName + id)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
\ No newline at end of file