You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/14 11:55:22 UTC

[1/4] incubator-gearpump git commit: [GEARPUMP-224] merge gearpump-daemon to gearpump-core

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master a01809b25 -> c3d5eb63f


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala
deleted file mode 100644
index 325a484..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import scala.concurrent.duration._
-
-import akka.actor.Props
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import org.apache.gearpump.cluster.master.InMemoryKVService._
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-
-class InMemoryKVServiceSpec
-  extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-
-  override def config: Config = TestUtil.MASTER_CONFIG
-
-  "KVService" should "get, put, delete correctly" in {
-    val system = getActorSystem
-    val kvService = system.actorOf(Props(new InMemoryKVService()))
-    val group = "group"
-
-    val client = TestProbe()(system)
-
-    client.send(kvService, PutKV(group, "key", 1))
-    client.expectMsg(PutKVSuccess)
-
-    client.send(kvService, PutKV(group, "key", 2))
-    client.expectMsg(PutKVSuccess)
-
-    client.send(kvService, GetKV(group, "key"))
-    client.expectMsg(GetKVSuccess("key", 2))
-
-    client.send(kvService, DeleteKVGroup(group))
-
-    // After DeleteGroup, it no longer accept Get and Put message for this group.
-    client.send(kvService, GetKV(group, "key"))
-    client.expectNoMsg(3.seconds)
-
-    client.send(kvService, PutKV(group, "key", 3))
-    client.expectNoMsg(3.seconds)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
deleted file mode 100644
index e82dff3..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.scheduler
-
-import org.apache.gearpump.cluster.worker.WorkerId
-
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
-import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
-import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
-import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-
-class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
-  with WordSpecLike with Matchers with BeforeAndAfterAll{
-
-  def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
-  val appId = 0
-  val workerId1: WorkerId = WorkerId(1, 0L)
-  val workerId2: WorkerId = WorkerId(2, 0L)
-  val mockAppMaster = TestProbe()
-  val mockWorker1 = TestProbe()
-  val mockWorker2 = TestProbe()
-
-  override def afterAll {
-    TestKit.shutdownActorSystem(system)
-  }
-
-  "The scheduler" should {
-    "update resource only when the worker is registered" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
-      expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " +
-        s"registered into master"))
-    }
-
-    "drop application's resource requests when the application is removed" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
-      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
-      scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
-      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
-      mockAppMaster.expectNoMsg(5.seconds)
-    }
-  }
-
-  def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = {
-    left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
-  }
-
-  "The resource request with higher priority" should {
-    "be handled first" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY)
-      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY)
-
-      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
-      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
-
-      var expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
-
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-    }
-  }
-
-  "The resource request which delivered earlier" should {
-    "be handled first if the priorities are the same" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
-      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
-      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
-
-      var expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-    }
-  }
-
-  "The PriorityScheduler" should {
-    "handle the resource request with different relaxation" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER)
-      val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER)
-
-      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
-      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
-      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
-
-      var expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
-      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
-
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      val request3 = ResourceRequest(
-        Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2)
-      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
-
-      expect = ResourceAllocated(Array(
-        ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
-        ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-
-      // We have to manually update the resource on each worker
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
-      val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER)
-      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
-
-      expect = ResourceAllocated(
-        Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
-      mockAppMaster.expectMsgPF(5.seconds) {
-        case request: ResourceAllocated if sameElement(request, expect) => Unit
-      }
-    }
-  }
-
-  "The PriorityScheduler" should {
-    "handle the resource request with different executor number" in {
-      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
-      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
-      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
-
-      // By default, the request requires only one executor
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
-      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
-      val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
-      assert(allocations2.allocations.length == 1)
-      assert(allocations2.allocations.head.resource == Resource(20))
-
-      val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
-      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
-      val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
-      assert(allocations3.allocations.length == 3)
-      assert(allocations3.allocations.forall(_.resource == Resource(8)))
-
-      // The total available resource can not satisfy the requirements with executor number
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
-      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
-      val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
-      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
-      val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
-      assert(allocations4.allocations.length == 2)
-      assert(allocations4.allocations.forall(_.resource == Resource(20)))
-
-      // When new resources are available, the remaining request will be satisfied
-      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
-      val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
-      assert(allocations5.allocations.length == 1)
-      assert(allocations4.allocations.forall(_.resource == Resource(20)))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
deleted file mode 100644
index bf25057..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.worker
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, PoisonPill, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest._
-
-import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor}
-import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
-import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed}
-import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
-import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
-
-class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  val appId = 1
-  val workerId: WorkerId = WorkerId(1, 0L)
-  val executorId = 1
-  var masterProxy: TestProbe = null
-  var mockMaster: TestProbe = null
-  var client: TestProbe = null
-  val workerSlots = 50
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-    mockMaster = TestProbe()(getActorSystem)
-    masterProxy = TestProbe()(getActorSystem)
-    client = TestProbe()(getActorSystem)
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-
-  "The new started worker" should {
-    "kill itself if no response from Master after registering" in {
-      val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
-      mockMaster watch worker
-      mockMaster.expectMsg(RegisterNewWorker)
-      mockMaster.expectTerminated(worker, 60.seconds)
-    }
-  }
-
-  "Worker" should {
-    "init its resource from the gearpump config" in {
-      val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
-        withFallback(TestUtil.DEFAULT_CONFIG)
-      val workerSystem = ActorSystem("WorkerSystem", config)
-      val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
-      mockMaster watch worker
-      mockMaster.expectMsg(RegisterNewWorker)
-
-      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
-      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots)))
-
-      worker.tell(
-        UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
-      mockMaster.expectTerminated(worker, 5.seconds)
-      workerSystem.terminate()
-      Await.result(workerSystem.whenTerminated, Duration.Inf)
-    }
-  }
-
-  "Worker" should {
-    "update its remaining resource when launching and shutting down executors" in {
-      val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref))
-      masterProxy.expectMsg(RegisterNewWorker)
-
-      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
-      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
-
-      val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
-      // This is an actor path which the ActorSystemBooter will report back to,
-      // not needed in this test
-      val reportBack = "dummy"
-      val executionContext = ExecutorJVMConfig(Array.empty[String],
-        getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "),
-        classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None,
-        username = "user")
-
-      // Test LaunchExecutor
-      worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext),
-        mockMaster.ref)
-      mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine"))
-
-      worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref)
-      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
-
-      worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref)
-      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
-
-      // Test terminationWatch
-      worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref)
-      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
-      client.expectMsg(ShutdownExecutorSucceed(1, 1))
-
-      worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref)
-      client.expectMsg(ShutdownExecutorFailed(
-        s"Can not find executor ${executorId + 1} for app $appId"))
-
-      mockMaster.ref ! PoisonPill
-      masterProxy.expectMsg(RegisterWorker(workerId))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
index 84dec70..2988f5b 100644
--- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
@@ -20,7 +20,6 @@ package org.apache.gearpump.redis
 import java.nio.charset.Charset
 
 object RedisMessage {
-
   private def toBytes(strings: List[String]): List[Array[Byte]] =
     strings.map(string => string.getBytes(Charset.forName("UTF8")))
 
@@ -48,11 +47,10 @@ object RedisMessage {
      * @param latitude
      * @param member
      */
-    case class GEOADD(key: Array[Byte], longitude: Double,
-                      latitude: Double, member: Array[Byte]) {
-      def this(key: String, longitude: Double,
-               latitude: Double, member: String) =
+    case class GEOADD(key: Array[Byte], longitude: Double, latitude: Double, member: Array[Byte]) {
+      def this(key: String, longitude: Double, latitude: Double, member: String) = {
         this(toBytes(key), longitude, latitude, toBytes(member))
+      }
     }
 
   }
@@ -66,7 +64,9 @@ object RedisMessage {
      * @param field
      */
     case class HDEL(key: Array[Byte], field: Array[Byte]) {
-      def this(key: String, field: String) = this(toBytes(key), toBytes(field))
+      def this(key: String, field: String) = {
+        this(toBytes(key), toBytes(field))
+      }
     }
 
     /**
@@ -77,8 +77,9 @@ object RedisMessage {
      * @param increment
      */
     case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) {
-      def this(key: String, field: String, increment: Long) =
+      def this(key: String, field: String, increment: Long) = {
         this(toBytes(key), toBytes(field), increment)
+      }
     }
 
     /**
@@ -89,8 +90,9 @@ object RedisMessage {
      * @param increment
      */
     case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) {
-      def this(key: String, field: String, increment: Float) =
+      def this(key: String, field: String, increment: Float) = {
         this(toBytes(key), toBytes(field), increment)
+      }
     }
 
 
@@ -102,8 +104,9 @@ object RedisMessage {
      * @param value
      */
     case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
-      def this(key: String, field: String, value: String) =
+      def this(key: String, field: String, value: String) = {
         this(toBytes(key), toBytes(field), toBytes(value))
+      }
     }
 
     /**
@@ -114,8 +117,9 @@ object RedisMessage {
      * @param value
      */
     case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
-      def this(key: String, field: String, value: String) =
+      def this(key: String, field: String, value: String) = {
         this(toBytes(key), toBytes(field), toBytes(value))
+      }
     }
 
   }
@@ -142,8 +146,9 @@ object RedisMessage {
      * @param value
      */
     case class LPUSH(key: Array[Byte], value: Array[Byte]) {
-
-      def this(key: String, value: String) = this(key, toBytes(value))
+      def this(key: String, value: String) = {
+        this(key, toBytes(value))
+      }
     }
 
     /**
@@ -153,7 +158,9 @@ object RedisMessage {
      * @param value
      */
     case class LPUSHX(key: Array[Byte], value: Array[Byte]) {
-      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
     /**
@@ -164,7 +171,9 @@ object RedisMessage {
      * @param value
      */
     case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) {
-      def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value))
+      def this(key: String, index: Long, value: String) = {
+        this(toBytes(key), index, toBytes(value))
+      }
     }
 
     /**
@@ -174,8 +183,9 @@ object RedisMessage {
      * @param value
      */
     case class RPUSH(key: Array[Byte], value: Array[Byte]) {
-
-      def this(key: String, value: String) = this(key, toBytes(value))
+      def this(key: String, value: String) = {
+        this(key, toBytes(value))
+      }
     }
 
     /**
@@ -185,7 +195,9 @@ object RedisMessage {
      * @param value
      */
     case class RPUSHX(key: Array[Byte], value: Array[Byte]) {
-      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
   }
@@ -198,8 +210,9 @@ object RedisMessage {
      * @param message
      */
     case class DEL(message: Array[Byte]) {
-
-      def this(message: String) = this(toBytes(message))
+      def this(message: String) = {
+        this(toBytes(message))
+      }
     }
 
     /**
@@ -208,7 +221,9 @@ object RedisMessage {
      * @param key
      */
     case class EXPIRE(key: Array[Byte], seconds: Int) {
-      def this(key: String, seconds: Int) = this(toBytes(key), seconds)
+      def this(key: String, seconds: Int) = {
+        this(toBytes(key), seconds)
+      }
     }
 
     /**
@@ -218,7 +233,9 @@ object RedisMessage {
      * @param timestamp
      */
     case class EXPIREAT(key: Array[Byte], timestamp: Long) {
-      def this(key: String, timestamp: Long) = this(toBytes(key), timestamp)
+      def this(key: String, timestamp: Long) = {
+        this(toBytes(key), timestamp)
+      }
     }
 
     /**
@@ -230,9 +247,11 @@ object RedisMessage {
      * @param database
      * @param timeout
      */
-    case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) {
-      def this(host: String, port: Int, key: String, database: Int, timeout: Int) =
+    case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte],
+        database: Int, timeout: Int) {
+      def this(host: String, port: Int, key: String, database: Int, timeout: Int) = {
         this(toBytes(host), port, toBytes(key), database, timeout)
+      }
     }
 
     /**
@@ -242,7 +261,9 @@ object RedisMessage {
      * @param db
      */
     case class MOVE(key: Array[Byte], db: Int) {
-      def this(key: String, db: Int) = this(toBytes(key), db)
+      def this(key: String, db: Int) = {
+        this(toBytes(key), db)
+      }
     }
 
     /**
@@ -251,7 +272,9 @@ object RedisMessage {
      * @param key
      */
     case class PERSIST(key: Array[Byte]) {
-      def this(key: String) = this(toBytes(key))
+      def this(key: String) = {
+        this(toBytes(key))
+      }
     }
 
     /**
@@ -261,7 +284,9 @@ object RedisMessage {
      * @param milliseconds
      */
     case class PEXPIRE(key: Array[Byte], milliseconds: Long) {
-      def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+      def this(key: String, milliseconds: Long) = {
+        this(toBytes(key), milliseconds)
+      }
     }
 
     /**
@@ -271,7 +296,9 @@ object RedisMessage {
      * @param timestamp
      */
     case class PEXPIREAT(key: Array[Byte], timestamp: Long) {
-      def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds)
+      def this(key: String, milliseconds: Long) = {
+        this(toBytes(key), milliseconds)
+      }
     }
 
     /**
@@ -281,7 +308,9 @@ object RedisMessage {
      * @param newKey
      */
     case class RENAME(key: Array[Byte], newKey: Array[Byte]) {
-      def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+      def this(key: String, newKey: String) = {
+        this(toBytes(key), toBytes(newKey))
+      }
     }
 
     /**
@@ -291,7 +320,9 @@ object RedisMessage {
      * @param newKey
      */
     case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) {
-      def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey))
+      def this(key: String, newKey: String) = {
+        this(toBytes(key), toBytes(newKey))
+      }
     }
 
   }
@@ -306,8 +337,9 @@ object RedisMessage {
      * @param members
      */
     case class SADD(key: Array[Byte], members: Array[Byte]) {
-
-      def this(key: String, members: String) = this(key, toBytes(members))
+      def this(key: String, members: String) = {
+        this(key, toBytes(members))
+      }
     }
 
 
@@ -319,8 +351,9 @@ object RedisMessage {
      * @param member
      */
     case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) {
-      def this(source: String, destination: String, member: String) =
+      def this(source: String, destination: String, member: String) = {
         this(toBytes(source), toBytes(destination), toBytes(member))
+      }
     }
 
 
@@ -331,8 +364,9 @@ object RedisMessage {
      * @param member
      */
     case class SREM(key: Array[Byte], member: Array[Byte]) {
-
-      def this(key: String, member: String) = this(key, toBytes(member))
+      def this(key: String, member: String) = {
+        this(key, toBytes(member))
+      }
     }
 
   }
@@ -346,7 +380,9 @@ object RedisMessage {
      * @param value
      */
     case class APPEND(key: Array[Byte], value: Array[Byte]) {
-      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
     /**
@@ -355,7 +391,9 @@ object RedisMessage {
      * @param key
      */
     case class DECR(key: Array[Byte]) {
-      def this(key: String) = this(toBytes(key))
+      def this(key: String) = {
+        this(toBytes(key))
+      }
     }
 
     /**
@@ -365,7 +403,9 @@ object RedisMessage {
      * @param decrement
      */
     case class DECRBY(key: Array[Byte], decrement: Int) {
-      def this(key: String, decrement: Int) = this(toBytes(key), decrement)
+      def this(key: String, decrement: Int) = {
+        this(toBytes(key), decrement)
+      }
     }
 
     /**
@@ -374,7 +414,9 @@ object RedisMessage {
      * @param key
      */
     case class INCR(key: Array[Byte]) {
-      def this(key: String) = this(toBytes(key))
+      def this(key: String) = {
+        this(toBytes(key))
+      }
     }
 
     /**
@@ -384,7 +426,9 @@ object RedisMessage {
      * @param increment
      */
     case class INCRBY(key: Array[Byte], increment: Int) {
-      def this(key: String, increment: Int) = this(toBytes(key), increment)
+      def this(key: String, increment: Int) = {
+        this(toBytes(key), increment)
+      }
     }
 
     /**
@@ -394,7 +438,9 @@ object RedisMessage {
      * @param increment
      */
     case class INCRBYFLOAT(key: Array[Byte], increment: Double) {
-      def this(key: String, increment: Number) = this(toBytes(key), increment)
+      def this(key: String, increment: Number) = {
+        this(toBytes(key), increment)
+      }
     }
 
 
@@ -405,7 +451,9 @@ object RedisMessage {
      * @param value
      */
     case class SET(key: Array[Byte], value: Array[Byte]) {
-      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
     /**
@@ -416,7 +464,9 @@ object RedisMessage {
      * @param value
      */
     case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) {
-      def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value))
+      def this(key: String, offset: Long, value: String) = {
+        this(toBytes(key), offset, toBytes(value))
+      }
     }
 
     /**
@@ -427,7 +477,9 @@ object RedisMessage {
      * @param value
      */
     case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) {
-      def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value))
+      def this(key: String, seconds: Int, value: String) = {
+        this(toBytes(key), seconds, toBytes(value))
+      }
     }
 
     /**
@@ -437,7 +489,9 @@ object RedisMessage {
      * @param value
      */
     case class SETNX(key: Array[Byte], value: Array[Byte]) {
-      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+      def this(key: String, value: String) = {
+        this(toBytes(key), toBytes(value))
+      }
     }
 
     /**
@@ -448,9 +502,9 @@ object RedisMessage {
      * @param value
      */
     case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) {
-      def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value))
+      def this(key: String, offset: Int, value: String) = {
+        this(toBytes(key), offset, toBytes(value))
+      }
     }
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
----------------------------------------------------------------------
diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
index 3f75949..36a9fe3 100644
--- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
@@ -32,20 +32,20 @@ import redis.clients.jedis.Jedis
 import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT}
 
 /**
-  * Save message in Redis Instance
-  *
-  * @param host
-  * @param port
-  * @param timeout
-  * @param database
-  * @param password
-  */
+ * Save message in Redis Instance
+ *
+ * @param host
+ * @param port
+ * @param timeout
+ * @param database
+ * @param password
+ */
 class RedisSink(
-                    host: String = DEFAULT_HOST,
-                    port: Int = DEFAULT_PORT,
-                    timeout: Int = DEFAULT_TIMEOUT,
-                    database: Int = DEFAULT_DATABASE,
-                    password: String = "") extends DataSink {
+    host: String = DEFAULT_HOST,
+    port: Int = DEFAULT_PORT,
+    timeout: Int = DEFAULT_TIMEOUT,
+    database: Int = DEFAULT_DATABASE,
+    password: String = "") extends DataSink {
 
   private val LOG = LogUtil.getLogger(getClass)
   @transient private lazy val client = new Jedis(host, port, timeout)
@@ -59,7 +59,6 @@ class RedisSink(
   }
 
   override def write(message: Message): Unit = {
-
     message.msg match {
       // GEO
       case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 4552a64..40b5743 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -140,15 +140,6 @@ object Build extends sbt.Build {
     publishArtifact in Test := false
   )
 
-  val daemonDependencies = Seq(
-    libraryDependencies ++= Seq(
-      "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
-      "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
-      "commons-logging" % "commons-logging" % commonsLoggingVersion,
-      "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion
-    )
-  )
-
   val coreDependencies = Seq(
     libraryDependencies ++= Seq(
       "org.slf4j" % "slf4j-api" % slf4jVersion,
@@ -171,6 +162,10 @@ object Build extends sbt.Build {
       "com.typesafe.akka" %% "akka-remote" % akkaVersion
         exclude("io.netty", "netty"),
 
+      "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
+      "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
+      "commons-logging" % "commons-logging" % commonsLoggingVersion,
+      "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion,
       "com.typesafe.akka" %% "akka-actor" % akkaVersion,
       "com.typesafe.akka" %% "akka-agent" % akkaVersion,
       "com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
@@ -256,7 +251,7 @@ object Build extends sbt.Build {
     id = "gearpump",
     base = file("."),
     settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting)
-      .aggregate(shaded, core, daemon, streaming, services, external_kafka, external_monoid,
+      .aggregate(shaded, core, streaming, services, external_kafka, external_monoid,
       external_serializer, examples, storm, yarn, external_hbase, gearpumpHadoop, packProject,
       external_hadoopfs, integration_test).settings(Defaults.itSettings: _*)
       .disablePlugins(sbtassembly.AssemblyPlugin)
@@ -271,20 +266,13 @@ object Build extends sbt.Build {
           getShadedDepXML(organization.value, shaded_guava.id, version.value),
           getShadedDepXML(organization.value, shaded_metrics_graphite.id, version.value)), node)
       }
-    ))
-      .disablePlugins(sbtassembly.AssemblyPlugin)
+    )).disablePlugins(sbtassembly.AssemblyPlugin)
 
-  lazy val daemon = Project(
-    id = "gearpump-daemon",
-    base = file("daemon"),
-    settings = commonSettings ++ daemonDependencies)
-      .dependsOn(core % "test->test; compile->compile", cgroup % "test->test; compile->compile")
-      .disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val cgroup = Project(
     id = "gearpump-experimental-cgroup",
     base = file("experiments/cgroup"),
-    settings = commonSettings ++ noPublish ++ daemonDependencies)
+    settings = commonSettings ++ noPublish)
       .dependsOn (core % "test->test; compile->compile")
       .disablePlugins(sbtassembly.AssemblyPlugin)
 
@@ -301,7 +289,7 @@ object Build extends sbt.Build {
           getShadedDepXML(organization.value, shaded_gs_collections.id, version.value)), node)
       }
     ))
-    .dependsOn(core % "test->test; compile->compile", shaded_gs_collections, daemon % "test->test")
+    .dependsOn(core % "test->test; compile->compile", shaded_gs_collections)
     .disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val external_kafka = Project(
@@ -412,19 +400,18 @@ object Build extends sbt.Build {
         ),
         mainClass in(Compile, packageBin) := Some("akka.stream.gearpump.example.Test")
       ))
-      .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+      .dependsOn(streaming % "test->test; provided")
 
   lazy val redis = Project(
     id = "gearpump-experiments-redis",
     base = file("experiments/redis"),
-    settings = commonSettings ++ noPublish ++ myAssemblySettings ++
+    settings = commonSettings ++ noPublish ++
       Seq(
         libraryDependencies ++= Seq(
           "redis.clients" % "jedis" % "2.9.0"
-        ),
-        mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test")
-      ))
-    .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+        )
+      )
+  ).dependsOn(streaming % "test->test; provided")
 
   lazy val storm = Project(
     id = "gearpump-experiments-storm",
@@ -489,7 +476,7 @@ object Build extends sbt.Build {
           "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % hadoopVersion % "provided"
         )
       ))
-      .dependsOn(services % "test->test;compile->compile", daemon % "provided",
+      .dependsOn(services % "test->test;compile->compile",
         core % "provided", gearpumpHadoop).disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val external_hbase = Project(

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/project/BuildExample.scala
----------------------------------------------------------------------
diff --git a/project/BuildExample.scala b/project/BuildExample.scala
index 75fc9be..fadc1ec 100644
--- a/project/BuildExample.scala
+++ b/project/BuildExample.scala
@@ -42,7 +42,7 @@ object BuildExample extends sbt.Build {
         target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" /
           CrossVersion.binaryScalaVersion(scalaVersion.value)
       )
-  ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+  ) dependsOn(streaming % "test->test; provided")
 
   lazy val wordcount = Project(
     id = "gearpump-examples-wordcount",
@@ -55,7 +55,7 @@ object BuildExample extends sbt.Build {
         target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" /
           CrossVersion.binaryScalaVersion(scalaVersion.value)
       )
-  ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided")
+  ) dependsOn(streaming % "test->test; provided")
 
   lazy val sol = Project(
     id = "gearpump-examples-sol",
@@ -113,7 +113,7 @@ object BuildExample extends sbt.Build {
         target in assembly := baseDirectory.value.getParentFile / "target" /
           CrossVersion.binaryScalaVersion(scalaVersion.value)
       )
-  ) dependsOn (daemon % "test->test; provided")
+  ) dependsOn (core % "test->test; provided")
 
   lazy val distributeservice = Project(
     id = "gearpump-examples-distributeservice",
@@ -133,7 +133,7 @@ object BuildExample extends sbt.Build {
         target in assembly := baseDirectory.value.getParentFile / "target" /
           CrossVersion.binaryScalaVersion(scalaVersion.value)
       )
-  ) dependsOn (daemon % "test->test; provided")
+  ) dependsOn (core % "test->test; provided")
 
   lazy val fsio = Project(
     id = "gearpump-examples-fsio",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index 13e53de..54b1d43 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -24,7 +24,6 @@ import xerial.sbt.Pack._
 object Pack extends sbt.Build {
   val daemonClassPath = Seq(
     "${PROG_HOME}/conf",
-    "${PROG_HOME}/lib/daemon/*",
     // This is for DFSJarStore
     "${PROG_HOME}/lib/yarn/*"
   )
@@ -37,14 +36,12 @@ object Pack extends sbt.Build {
 
   val serviceClassPath = Seq(
     "${PROG_HOME}/conf",
-    "${PROG_HOME}/lib/daemon/*",
     "${PROG_HOME}/lib/services/*",
     "${PROG_HOME}/dashboard"
   )
 
   val yarnClassPath = Seq(
     "${PROG_HOME}/conf",
-    "${PROG_HOME}/lib/daemon/*",
     "${PROG_HOME}/lib/services/*",
     "${PROG_HOME}/lib/yarn/*",
     "${PROG_HOME}/conf/yarnconf",
@@ -112,11 +109,10 @@ object Pack extends sbt.Build {
             "-Dgearpump.home=${PROG_HOME}")
         ),
         packLibDir := Map(
-          "lib" -> new ProjectsToPack(core.id, streaming.id),
-          "lib/daemon" -> new ProjectsToPack(daemon.id, cgroup.id).exclude(core.id, streaming.id),
+          "lib" -> new ProjectsToPack(core.id, cgroup.id, streaming.id),
           "lib/yarn" -> new ProjectsToPack(gearpumpHadoop.id, yarn.id).
-            exclude(services.id, daemon.id, core.id),
-          "lib/services" -> new ProjectsToPack(services.id).exclude(daemon.id),
+            exclude(services.id, core.id),
+          "lib/services" -> new ProjectsToPack(services.id).exclude(core.id),
           "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id)
         ),
         packExclude := Seq(thisProjectRef.value.project),
@@ -139,7 +135,7 @@ object Pack extends sbt.Build {
           "gear" -> applicationClassPath,
           "local" -> daemonClassPath,
           "master" -> daemonClassPath,
-          "worker" -> daemonClassPath,
+          "worker" -> applicationClassPath,
           "services" -> serviceClassPath,
           "yarnclient" -> yarnClassPath,
           "storm" -> stormClassPath
@@ -149,6 +145,6 @@ object Pack extends sbt.Build {
         packArchiveExcludes := Seq("integrationtest")
 
       )
-  ).dependsOn(core, streaming, services, yarn, storm).
+  ).dependsOn(core, streaming, services, yarn, storm, cgroup).
     disablePlugins(sbtassembly.AssemblyPlugin)
 }



[3/4] incubator-gearpump git commit: [GEARPUMP-224] merge gearpump-daemon to gearpump-core

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
new file mode 100644
index 0000000..447b034
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+import java.lang.management.ManagementFactory
+import java.net.URL
+import java.util.concurrent.{Executors, TimeUnit}
+
+import akka.actor.SupervisorStrategy.Stop
+import akka.actor._
+import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceSucceed, UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster._
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher
+import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
+import org.apache.gearpump.jarstore.JarStoreClient
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.util.ActorSystemBooter.Daemon
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util.{TimeOutScheduler, _}
+import org.slf4j.Logger
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Worker is used to track the resource on single machine, it is like
+ * the node manager of YARN.
+ *
+ * @param masterProxy masterProxy is used to resolve the master
+ */
+private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
+  private val systemConfig: Config = context.system.settings.config
+
+  private val address = ActorUtil.getFullPath(context.system, self.path)
+  private var resource = Resource.empty
+  private var allocatedResources = Map[ActorRef, Resource]()
+  private var executorsInfo = Map[ActorRef, ExecutorSlots]()
+  private var id: WorkerId = WorkerId.unspecified
+  private val createdTime = System.currentTimeMillis()
+  private var masterInfo: MasterInfo = null
+  private var executorNameToActor = Map.empty[String, ActorRef]
+  private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
+  private val jarStoreClient = new JarStoreClient(systemConfig, context.system)
+
+  private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
+  private val resourceUpdateTimeoutMs = 30000 // Milliseconds
+
+  private var totalSlots: Int = 0
+
+  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+  var historyMetricsService: Option[ActorRef] = None
+
+  override def receive: Receive = null
+  var LOG: Logger = LogUtil.getLogger(getClass)
+
+  def service: Receive =
+    appMasterMsgHandler orElse
+      clientMessageHandler orElse
+      metricsService orElse
+      terminationWatch(masterInfo.master) orElse
+      ActorUtil.defaultMsgHandler(self)
+
+  def metricsService: Receive = {
+    case query: QueryHistoryMetrics =>
+      if (historyMetricsService.isEmpty) {
+        // Returns empty metrics so that we don't hang the UI
+        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+      } else {
+        historyMetricsService.get forward query
+      }
+  }
+
+  private var metricsInitialized = false
+
+  val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+
+  private def initializeMetrics(): Unit = {
+    // Registers jvm metrics
+    val metricsSetName = "worker" + WorkerId.render(id)
+    Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
+
+    historyMetricsService = if (metricsEnabled) {
+      val historyMetricsService = {
+        context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
+      }
+
+      val metricsReportService = context.actorOf(Props(
+        new MetricsReporterService(Metrics(context.system))))
+      historyMetricsService.tell(ReportMetrics, metricsReportService)
+      Some(historyMetricsService)
+    } else {
+      None
+    }
+  }
+
+  def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
+
+    // If master get disconnected, the WorkerRegistered may be triggered multiple times.
+    case WorkerRegistered(id, masterInfo) =>
+      this.id = id
+
+      // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
+      // itself.
+      if (!metricsInitialized) {
+        initializeMetrics()
+        metricsInitialized = true
+      }
+
+      this.masterInfo = masterInfo
+      timeoutTicker.cancel()
+      context.watch(masterInfo.master)
+      this.LOG = LogUtil.getLogger(getClass, worker = id)
+      LOG.info(s"Worker is registered. " +
+        s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
+      sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
+        resourceUpdateTimeoutMs, updateResourceTimeOut())
+      context.become(service)
+  }
+
+  private def updateResourceTimeOut(): Unit = {
+    LOG.error(s"Update worker resource time out")
+  }
+
+  def appMasterMsgHandler: Receive = {
+    case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
+      val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+      val executorToStop = executorNameToActor.get(actorName)
+      if (executorToStop.isDefined) {
+        LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
+          s"due to: $reason")
+        executorToStop.get.forward(shutdown)
+      } else {
+        LOG.error(s"Cannot find executor $actorName, ignore this message")
+        sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
+      }
+    case launch: LaunchExecutor =>
+      LOG.info(s"$launch")
+      if (resource < launch.resource) {
+        sender ! ExecutorLaunchRejected("There is no free resource on this machine")
+      } else {
+        val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
+
+        val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
+          jarStoreClient, executorProcLauncher))
+        executorNameToActor += actorName -> executor
+
+        resource = resource - launch.resource
+        allocatedResources = allocatedResources + (executor -> launch.resource)
+
+        reportResourceToMaster()
+        executorsInfo += executor ->
+          ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
+        context.watch(executor)
+      }
+    case UpdateResourceFailed(reason, ex) =>
+      LOG.error(reason)
+      context.stop(self)
+    case UpdateResourceSucceed =>
+      LOG.info(s"Update resource succeed")
+    case GetWorkerData(workerId) =>
+      val aliveFor = System.currentTimeMillis() - createdTime
+      val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+      val userDir = System.getProperty("user.dir")
+      sender ! WorkerData(WorkerSummary(
+        id, "active",
+        address,
+        aliveFor,
+        logDir,
+        executorsInfo.values.toArray,
+        totalSlots,
+        resource.slots,
+        userDir,
+        jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+        resourceManagerContainerId = systemConfig.getString(
+          GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
+        historyMetricsConfig = getHistoryMetricsConfig)
+      )
+    case ChangeExecutorResource(appId, executorId, usedResource) =>
+      for (executor <- executorActorRef(appId, executorId);
+        allocatedResource <- allocatedResources.get(executor)) {
+
+        allocatedResources += executor -> usedResource
+        resource = resource + allocatedResource - usedResource
+        reportResourceToMaster()
+
+        if (usedResource == Resource(0)) {
+          executorsInfo -= executor
+          allocatedResources -= executor
+          // stop executor if there is no resource binded to it.
+          LOG.info(s"Shutdown executor $executorId because the resource used is zero")
+          executor ! ShutdownExecutor(appId, executorId,
+            "Shutdown executor because the resource used is zero")
+        }
+      }
+  }
+
+  private def reportResourceToMaster(): Unit = {
+    sendMsgWithTimeOutCallBack(masterInfo.master,
+      ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
+  }
+
+  private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
+    val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+    executorNameToActor.get(actorName)
+  }
+
+  def clientMessageHandler: Receive = {
+    case QueryWorkerConfig(workerId) =>
+      if (this.id == workerId) {
+        sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+      } else {
+        sender ! WorkerConfig(ConfigFactory.empty)
+      }
+  }
+
+  private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
+    repeatActionUtil(
+      seconds = timeOutSeconds,
+      action = () => {
+        masterProxy ! RegisterWorker(workerId)
+      },
+      onTimeout = () => {
+        LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
+          s"seconds, abort and kill the worker...")
+        self ! PoisonPill
+      })
+  }
+
+  def terminationWatch(master: ActorRef): Receive = {
+    case Terminated(actor) =>
+      if (actor.compareTo(master) == 0) {
+        // Parent master is down, no point to keep worker anymore. Let's make suicide to free
+        // resources
+        LOG.info(s"Master cannot be contacted, find a new master ...")
+        context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
+      } else if (ActorUtil.isChildActorPath(self, actor)) {
+        // One executor is down,
+        LOG.info(s"Executor is down ${getExecutorName(actor)}")
+
+        val allocated = allocatedResources.get(actor)
+        if (allocated.isDefined) {
+          resource = resource + allocated.get
+          executorsInfo -= actor
+          allocatedResources = allocatedResources - actor
+          sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
+            resourceUpdateTimeoutMs, updateResourceTimeOut())
+        }
+      }
+  }
+
+  private def getExecutorName(actorRef: ActorRef): Option[String] = {
+    executorNameToActor.find(_._2 == actorRef).map(_._1)
+  }
+
+  private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
+    val launcherClazz = Class.forName(
+      systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
+    launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
+      .asInstanceOf[ExecutorProcessLauncher]
+  }
+
+  import context.dispatcher
+  override def preStart(): Unit = {
+    LOG.info(s"RegisterNewWorker")
+    totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
+    this.resource = Resource(totalSlots)
+    masterProxy ! RegisterNewWorker
+    context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
+  }
+
+  private def registerTimeoutTicker(seconds: Int): Cancellable = {
+    repeatActionUtil(seconds, () => Unit, () => {
+      LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
+        s"abort and kill the worker...")
+      self ! PoisonPill
+    })
+  }
+
+  private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
+    : Cancellable = {
+    val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
+      Duration(2, TimeUnit.SECONDS))(action())
+    val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
+    new Cancellable {
+      def cancel(): Boolean = {
+        val result1 = cancelTimeout.cancel()
+        val result2 = cancelSuicide.cancel()
+        result1 && result2
+      }
+
+      def isCancelled: Boolean = {
+        cancelTimeout.isCancelled && cancelSuicide.isCancelled
+      }
+    }
+  }
+
+  override def postStop(): Unit = {
+    LOG.info(s"Worker is going down....")
+    ioPool.shutdown()
+    context.system.terminate()
+  }
+}
+
+private[cluster] object Worker {
+
+  case class ExecutorResult(result: Try[Int])
+
+  class ExecutorWatcher(
+      launch: LaunchExecutor,
+      masterInfo: MasterInfo,
+      ioPool: ExecutionContext,
+      jarStoreClient: JarStoreClient,
+      procLauncher: ExecutorProcessLauncher) extends Actor {
+    import launch.{appId, executorId, resource}
+
+    private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
+
+    val executorConfig: Config = {
+      val workerConfig = context.system.settings.config
+
+      val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
+        Option(jvmConfig.executorAkkaConfig)
+      }.getOrElse(ConfigFactory.empty())
+
+      resolveExecutorConfig(workerConfig, submissionConfig)
+    }
+
+    // For some config, worker has priority, for others, user Application submission config
+    // have priorities.
+    private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
+      val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
+        .withoutPath(GEARPUMP_CLUSTER_MASTERS)
+        .withoutPath(GEARPUMP_HOME)
+        .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
+        .withoutPath(GEARPUMP_LOG_APPLICATION_DIR)
+        .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
+        // Falls back to workerConfig
+        .withFallback(workerConfig)
+
+      // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
+      val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
+      val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
+        LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
+        config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
+      } else {
+        config
+      }
+
+      // Excludes reference.conf, and JVM properties..
+      ClusterConfig.filterOutDefaultConfig(updatedConf)
+    }
+
+    implicit val executorService = ioPool
+
+    private val executorHandler = {
+      val ctx = launch.executorJvmConfig
+
+      if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
+        new ExecutorHandler {
+          val exitPromise = Promise[Int]()
+          val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
+
+          override def destroy(): Unit = {
+            context.stop(app)
+          }
+          override def exitValue: Future[Int] = {
+            exitPromise.future
+          }
+        }
+      } else {
+        createProcess(ctx)
+      }
+    }
+
+    private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
+
+      val process = Future {
+        val jarPath = ctx.jar.map { appJar =>
+          val tempFile = File.createTempFile(appJar.name, ".jar")
+          jarStoreClient.copyToLocalFile(tempFile, appJar.filePath)
+          val file = new URL("file:" + tempFile)
+          file.getFile
+        }
+
+        val configFile = {
+          val configFile = File.createTempFile("gearpump", ".conf")
+          ClusterConfig.saveConfig(executorConfig, configFile)
+          val file = new URL("file:" + configFile)
+          file.getFile
+        }
+
+        val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
+          ctx.classPath.map(path => expandEnviroment(path)) ++
+          jarPath.map(Array(_)).getOrElse(Array.empty[String])
+
+        val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
+        val logArgs = List(
+          s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
+          s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
+          s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
+          s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
+        val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
+
+        val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
+
+        // Remote debug executor process
+        val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
+        val remoteDebugConfig = if (remoteDebugFlag) {
+          val availablePort = Util.findFreePort().get
+          List(
+            "-Xdebug",
+            s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
+            s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
+          )
+        } else {
+          List.empty[String]
+        }
+
+        val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
+        val verboseGCConfig = if (verboseGCFlag) {
+          List(
+            s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
+            "-verbose:gc",
+            "-XX:+PrintGCDetails",
+            "-XX:+PrintGCDateStamps",
+            "-XX:+PrintTenuringDistribution",
+            "-XX:+PrintGCApplicationConcurrentTime",
+            "-XX:+PrintGCApplicationStoppedTime"
+          )
+        } else {
+          List.empty[String]
+        }
+
+        val ipv4 = List(s"-D${PREFER_IPV4}=true")
+
+        val options = ctx.jvmArguments ++ username ++
+          logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
+
+        val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
+          options, classPath, ctx.mainClass, ctx.arguments)
+
+        ProcessInfo(process, jarPath, configFile)
+      }
+
+      new ExecutorHandler {
+
+        var destroyed = false
+
+        override def destroy(): Unit = {
+          LOG.info(s"Destroy executor process ${ctx.mainClass}")
+          if (!destroyed) {
+            destroyed = true
+            process.foreach { info =>
+              info.process.destroy()
+              info.jarPath.foreach(new File(_).delete())
+              new File(info.configFile).delete()
+            }
+          }
+        }
+
+        override def exitValue: Future[Int] = {
+          process.flatMap { info =>
+            val exit = info.process.exitValue()
+            if (exit == 0) {
+              Future.successful(0)
+            } else {
+              Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
+              s"error summary: ${info.process.logger.error}"))
+            }
+          }
+        }
+      }
+    }
+
+    private def expandEnviroment(path: String): String = {
+      // TODO: extend this to support more environment.
+      path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
+    }
+
+    override def preStart(): Unit = {
+      executorHandler.exitValue.onComplete { value =>
+        procLauncher.cleanProcess(appId, executorId)
+        val result = ExecutorResult(value)
+        self ! result
+      }
+    }
+
+    override def postStop(): Unit = {
+      executorHandler.destroy()
+    }
+
+    // The folders are under ${GEARPUMP_HOME}
+    val daemonPathPattern = List("lib" + File.separator + "yarn")
+
+    override def receive: Receive = {
+      case ShutdownExecutor(appId, executorId, reason: String) =>
+        executorHandler.destroy()
+        sender ! ShutdownExecutorSucceed(appId, executorId)
+        context.stop(self)
+      case ExecutorResult(executorResult) =>
+        executorResult match {
+          case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
+          case Failure(e) => LOG.error("Executor exit with errors", e)
+        }
+        context.stop(self)
+    }
+
+    private def getFormatedTime(timestamp: Long): String = {
+      val datePattern = "yyyy-MM-dd-HH-mm"
+      val format = new java.text.SimpleDateFormat(datePattern)
+      format.format(timestamp)
+    }
+
+    private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
+      classPath.filterNot(matchDaemonPattern(_))
+    }
+
+    private def matchDaemonPattern(path: String): Boolean = {
+      daemonPathPattern.exists(path.contains(_))
+    }
+  }
+
+  trait ExecutorHandler {
+    def destroy(): Unit
+    def exitValue: Future[Int]
+  }
+
+  case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
+
+  /**
+   * Starts the executor in  the same JVM as worker.
+   */
+  class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
+    extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
+    private val exitCode = 0
+
+    override val supervisorStrategy =
+      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
+        case ex: Throwable =>
+          LOG.error(s"system $name stopped ", ex)
+          exit.failure(ex)
+          Stop
+      }
+
+    override def postStop(): Unit = {
+      if (!exit.isCompleted) {
+        exit.success(exitCode)
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
new file mode 100644
index 0000000..0a22245
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.pattern.ask
+import akka.testkit.TestActorRef
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
+import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
+import org.apache.gearpump.cluster.master.Master
+import org.apache.gearpump.cluster.worker.Worker
+import org.apache.gearpump.util.Constants
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+class MiniCluster {
+  private val mockMasterIP = "127.0.0.1"
+
+  implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
+    withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP)))
+
+  val (mockMaster, worker) = {
+    val master = system.actorOf(Props(classOf[Master]), "master")
+    val worker = system.actorOf(Props(classOf[Worker], master), "worker")
+
+    // Wait until worker register itself to master
+    waitUtilWorkerIsRegistered(master)
+    (master, worker)
+  }
+
+  def launchActor(props: Props): TestActorRef[Actor] = {
+    TestActorRef(props)
+  }
+
+  private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
+    while (!isWorkerRegistered(master)) {}
+  }
+
+  private def isWorkerRegistered(master: ActorRef): Boolean = {
+    import scala.concurrent.duration._
+    implicit val dispatcher = system.dispatcher
+
+    implicit val futureTimeout = Constants.FUTURE_TIMEOUT
+
+    val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]]
+
+    // Waits until the worker is registered.
+    val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
+    workers.workers.size > 0
+  }
+
+  def shutDown(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
new file mode 100644
index 0000000..f9b0762
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
+import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, AppManager}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess}
+import org.apache.gearpump.cluster.{TestUtil, _}
+import org.apache.gearpump.util.LogUtil
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.util.Success
+
+class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+  var kvService: TestProbe = null
+  var haService: TestProbe = null
+  var appLauncher: TestProbe = null
+  var appManager: ActorRef = null
+  private val LOG = LogUtil.getLogger(getClass)
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+    kvService = TestProbe()(getActorSystem)
+    appLauncher = TestProbe()(getActorSystem)
+
+    appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
+      new DummyAppMasterLauncherFactory(appLauncher))))
+    kvService.expectMsgType[GetKV]
+    kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty)))
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "AppManager" should "handle AppMaster message correctly" in {
+    val appMaster = TestProbe()(getActorSystem)
+    val appId = 1
+
+    val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName"))
+    appMaster.send(appManager, register)
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    appMaster.send(appManager, ActivateAppMaster(appId))
+    appMaster.expectMsgType[AppMasterActivated]
+  }
+
+  "DataStoreService" should "support Put and Get" in {
+    val appMaster = TestProbe()(getActorSystem)
+    appMaster.send(appManager, SaveAppData(0, "key", 1))
+    kvService.expectMsgType[PutKV]
+    kvService.reply(PutKVSuccess)
+    appMaster.expectMsg(AppDataSaved)
+
+    appMaster.send(appManager, GetAppData(0, "key"))
+    kvService.expectMsgType[GetKV]
+    kvService.reply(GetKVSuccess("key", 1))
+    appMaster.expectMsg(GetAppDataResult("key", 1))
+  }
+
+  "AppManager" should "support application submission and shutdown" in {
+    testClientSubmission(withRecover = false)
+  }
+
+  "AppManager" should "support application submission and recover if appmaster dies" in {
+    LOG.info("=================testing recover==============")
+    testClientSubmission(withRecover = true)
+  }
+
+  "AppManager" should "handle client message correctly" in {
+    val mockClient = TestProbe()(getActorSystem)
+    mockClient.send(appManager, ShutdownApplication(1))
+    assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
+
+    mockClient.send(appManager, ResolveAppId(1))
+    assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
+
+    mockClient.send(appManager, AppMasterDataRequest(1))
+    mockClient.expectMsg(AppMasterData(AppMasterNonExist))
+  }
+
+  "AppManager" should "reject the application submission if the app name already existed" in {
+    val app = TestUtil.dummyApp
+    val submit = SubmitApplication(app, None, "username")
+    val client = TestProbe()(getActorSystem)
+    val appMaster = TestProbe()(getActorSystem)
+    val worker = TestProbe()(getActorSystem)
+    val appId = 1
+
+    client.send(appManager, submit)
+
+    kvService.expectMsgType[PutKV]
+    appLauncher.expectMsg(LauncherStarted(appId))
+    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+      AppMasterRuntimeInfo(appId, app.name)))
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    client.send(appManager, submit)
+    assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
+  }
+
+  def testClientSubmission(withRecover: Boolean): Unit = {
+    val app = TestUtil.dummyApp
+    val submit = SubmitApplication(app, None, "username")
+    val client = TestProbe()(getActorSystem)
+    val appMaster = TestProbe()(getActorSystem)
+    val worker = TestProbe()(getActorSystem)
+    val appId = 1
+
+    client.send(appManager, submit)
+
+    kvService.expectMsgType[PutKV]
+    appLauncher.expectMsg(LauncherStarted(appId))
+    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+      AppMasterRuntimeInfo(appId, app.name)))
+    kvService.expectMsgType[PutKV]
+    appMaster.expectMsgType[AppMasterRegistered]
+
+    client.send(appManager, ResolveAppId(appId))
+    client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
+
+    client.send(appManager, AppMastersDataRequest)
+    client.expectMsgType[AppMastersData]
+
+    client.send(appManager, AppMasterDataRequest(appId, false))
+    client.expectMsgType[AppMasterData]
+
+    if (!withRecover) {
+      client.send(appManager, ShutdownApplication(appId))
+      client.expectMsg(ShutdownApplicationResult(Success(appId)))
+    } else {
+      // Do recovery
+      getActorSystem.stop(appMaster.ref)
+      kvService.expectMsgType[GetKV]
+      val appState = ApplicationState(appId, "application1", 1, app, None, "username", null)
+      kvService.reply(GetKVSuccess(APP_STATE, appState))
+      appLauncher.expectMsg(LauncherStarted(appId))
+    }
+  }
+}
+
+class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory {
+  override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+      username: String, master: ActorRef, client: Option[ActorRef]): Props = {
+    Props(new DummyAppMasterLauncher(test, appId))
+  }
+}
+
+class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
+  test.ref ! LauncherStarted(appId)
+  
+  override def receive: Receive = {
+    case any: Any => test.ref forward any
+  }
+}
+
+case class LauncherStarted(appId: Int)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
new file mode 100644
index 0000000..d3e739f
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.Props
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.master.InMemoryKVService
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+class InMemoryKVServiceSpec
+  extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  override def config: Config = TestUtil.MASTER_CONFIG
+
+  "KVService" should "get, put, delete correctly" in {
+    val system = getActorSystem
+    val kvService = system.actorOf(Props(new InMemoryKVService()))
+    val group = "group"
+
+    val client = TestProbe()(system)
+
+    client.send(kvService, PutKV(group, "key", 1))
+    client.expectMsg(PutKVSuccess)
+
+    client.send(kvService, PutKV(group, "key", 2))
+    client.expectMsg(PutKVSuccess)
+
+    client.send(kvService, GetKV(group, "key"))
+    client.expectMsg(GetKVSuccess("key", 2))
+
+    client.send(kvService, DeleteKVGroup(group))
+
+    // After DeleteGroup, it no longer accept Get and Put message for this group.
+    client.send(kvService, GetKV(group, "key"))
+    client.expectNoMsg(3.seconds)
+
+    client.send(kvService, PutKV(group, "key", 3))
+    client.expectNoMsg(3.seconds)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
new file mode 100644
index 0000000..2166976
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.Properties
+
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _}
+import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
+import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{Constants, LogUtil, Util}
+import org.scalatest._
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "Worker" should "register worker address to master when started." in {
+
+    val masterReceiver = createMockMaster()
+
+    val tempTestConf = convertTestConf(getHost, getPort)
+
+    val options = Array(
+      s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
+      s"-D${PREFER_IPV4}=true"
+    ) ++ getMasterListOption()
+
+    val worker = Util.startProcess(options,
+      getContextClassPath,
+      getMainClassName(Worker),
+      Array.empty)
+
+    try {
+      masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
+
+      tempTestConf.delete()
+    } finally {
+      worker.destroy()
+    }
+  }
+
+  "Master" should "accept worker RegisterNewWorker when started" in {
+    val worker = TestProbe()(getActorSystem)
+
+    val host = "127.0.0.1"
+    val port = Util.findFreePort().get
+
+    val properties = new Properties()
+    properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port")
+    properties.put(s"${GEARPUMP_HOSTNAME}", s"$host")
+    val masterConfig = ConfigFactory.parseProperties(properties)
+      .withFallback(TestUtil.MASTER_CONFIG)
+    Future {
+      Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString))
+    }
+
+    val masterProxy = getActorSystem.actorOf(
+      MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
+
+    worker.send(masterProxy, RegisterNewWorker)
+    worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
+  }
+
+  "Info" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
+    }
+
+    masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
+    masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName"))))
+  }
+
+  "Kill" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      Kill.main(masterConfig, Array("-appid", "0"))
+    }
+
+    masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
+    masterReceiver.reply(ShutdownApplicationResult(Success(0)))
+  }
+
+  "Replay" should "be started without exception" in {
+
+    val masterReceiver = createMockMaster()
+
+    Future {
+      Replay.main(masterConfig, Array("-appid", "0"))
+    }
+
+    masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
+    masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
+    masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
+    masterReceiver.reply(ReplayApplicationResult(Success(0)))
+  }
+
+  "Local" should "be started without exception" in {
+    val port = Util.findFreePort().get
+    val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
+      s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
+      s"-D${PREFER_IPV4}=true")
+
+    val local = Util.startProcess(options,
+      getContextClassPath,
+      getMainClassName(Local),
+      Array.empty)
+
+    def retry(times: Int)(fn: => Boolean): Boolean = {
+
+      LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..")
+
+      val result = fn
+      if (result || times <= 0) {
+        result
+      } else {
+        Thread.sleep(1000)
+        retry(times - 1)(fn)
+      }
+    }
+
+    try {
+      assert(retry(10)(isPortUsed("127.0.0.1", port)),
+        "local is not started successfully, as port is not used " + port)
+    } finally {
+      local.destroy()
+    }
+  }
+
+  "Gear" should "support app|info|kill|shell|replay" in {
+
+    val commands = Array("app", "info", "kill", "shell", "replay")
+
+    assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
+
+    for (command <- commands) {
+      assert(Try(Gear.main(Array("-noexist"))).isFailure,
+        "pass unknown option, throw, command: " + command)
+    }
+
+    assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ")
+
+    val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
+    assert(tryThis.isFailure, "unknown command, throw")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
new file mode 100644
index 0000000..b48fc2a
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.TestUtil
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class MasterWatcherSpec extends FlatSpec with Matchers {
+  def config: Config = TestUtil.MASTER_CONFIG
+
+  "MasterWatcher" should "kill itself when can not get a quorum" in {
+    val system = ActorSystem("ForMasterWatcher", config)
+
+    val actorWatcher = TestProbe()(system)
+
+    val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
+    actorWatcher watch masterWatcher
+    actorWatcher.expectTerminated(masterWatcher, 5.seconds)
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
new file mode 100644
index 0000000..8a3d7d1
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.duration._
+
+class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
+  with WordSpecLike with Matchers with BeforeAndAfterAll{
+
+  def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
+  val appId = 0
+  val workerId1: WorkerId = WorkerId(1, 0L)
+  val workerId2: WorkerId = WorkerId(2, 0L)
+  val mockAppMaster = TestProbe()
+  val mockWorker1 = TestProbe()
+  val mockWorker2 = TestProbe()
+
+  override def afterAll {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The scheduler" should {
+    "update resource only when the worker is registered" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
+      expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " +
+        s"registered into master"))
+    }
+
+    "drop application's resource requests when the application is removed" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+      mockAppMaster.expectNoMsg(5.seconds)
+    }
+  }
+
+  def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = {
+    left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
+  }
+
+  "The resource request with higher priority" should {
+    "be handled first" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY)
+      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY)
+
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The resource request which delivered earlier" should {
+    "be handled first if the priorities are the same" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The PriorityScheduler" should {
+    "handle the resource request with different relaxation" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER)
+      val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER)
+
+      scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      val request3 = ResourceRequest(
+        Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+
+      expect = ResourceAllocated(Array(
+        ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
+        ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      // We have to manually update the resource on each worker
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
+      val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER)
+      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+    }
+  }
+
+  "The PriorityScheduler" should {
+    "handle the resource request with different executor number" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+      // By default, the request requires only one executor
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations2.allocations.length == 1)
+      assert(allocations2.allocations.head.resource == Resource(20))
+
+      val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+      val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations3.allocations.length == 3)
+      assert(allocations3.allocations.forall(_.resource == Resource(8)))
+
+      // The total available resource can not satisfy the requirements with executor number
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
+      val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
+      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+      val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations4.allocations.length == 2)
+      assert(allocations4.allocations.forall(_.resource == Resource(20)))
+
+      // When new resources are available, the remaining request will be satisfied
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
+      val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations5.allocations.length == 1)
+      assert(allocations4.allocations.forall(_.resource == Resource(20)))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
new file mode 100644
index 0000000..e0233f8
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import akka.actor.{ActorSystem, PoisonPill, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed}
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
+import org.scalatest._
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  val appId = 1
+  val workerId: WorkerId = WorkerId(1, 0L)
+  val executorId = 1
+  var masterProxy: TestProbe = null
+  var mockMaster: TestProbe = null
+  var client: TestProbe = null
+  val workerSlots = 50
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+    mockMaster = TestProbe()(getActorSystem)
+    masterProxy = TestProbe()(getActorSystem)
+    client = TestProbe()(getActorSystem)
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "The new started worker" should {
+    "kill itself if no response from Master after registering" in {
+      val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+      mockMaster watch worker
+      mockMaster.expectMsg(RegisterNewWorker)
+      mockMaster.expectTerminated(worker, 60.seconds)
+    }
+  }
+
+  "Worker" should {
+    "init its resource from the gearpump config" in {
+      val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
+        withFallback(TestUtil.DEFAULT_CONFIG)
+      val workerSystem = ActorSystem("WorkerSystem", config)
+      val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+      mockMaster watch worker
+      mockMaster.expectMsg(RegisterNewWorker)
+
+      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots)))
+
+      worker.tell(
+        UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
+      mockMaster.expectTerminated(worker, 5.seconds)
+      workerSystem.terminate()
+      Await.result(workerSystem.whenTerminated, Duration.Inf)
+    }
+  }
+
+  "Worker" should {
+    "update its remaining resource when launching and shutting down executors" in {
+      val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref))
+      masterProxy.expectMsg(RegisterNewWorker)
+
+      worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+
+      val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
+      // This is an actor path which the ActorSystemBooter will report back to,
+      // not needed in this test
+      val reportBack = "dummy"
+      val executionContext = ExecutorJVMConfig(Array.empty[String],
+        getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "),
+        classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None,
+        username = "user")
+
+      // Test LaunchExecutor
+      worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext),
+        mockMaster.ref)
+      mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine"))
+
+      worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
+
+      worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
+
+      // Test terminationWatch
+      worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref)
+      mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+      client.expectMsg(ShutdownExecutorSucceed(1, 1))
+
+      worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref)
+      client.expectMsg(ShutdownExecutorFailed(
+        s"Can not find executor ${executorId + 1} for app $appId"))
+
+      mockMaster.ref ! PoisonPill
+      masterProxy.expectMsg(RegisterWorker(workerId))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
deleted file mode 100644
index 9e55be6..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster
-
-import akka.actor.ActorRef
-
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.worker.WorkerId
-
-/**
- * Cluster Bootup Flow
- */
-object WorkerToMaster {
-
-  /** When an worker is started, it sends RegisterNewWorker */
-  case object RegisterNewWorker
-
-  /** When worker lose connection with master, it tries to register itself again with old Id. */
-  case class RegisterWorker(workerId: WorkerId)
-
-  /** Worker is responsible to broadcast its current status to master */
-  case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
-}
-
-object MasterToWorker {
-
-  /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
-  case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
-
-  /** Worker have not received reply from master */
-  case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
-
-  /** Master is synced with worker on resource slots managed by current worker */
-  case object UpdateResourceSucceed
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
deleted file mode 100644
index 9bde4d1..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.embedded
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import com.typesafe.config.{Config, ConfigValueFactory}
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.master.{Master => MasterActor}
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
-import org.apache.gearpump.util.{LogUtil, Util}
-
-/**
- * Create a in-process cluster with single worker
- */
-class EmbeddedCluster(inputConfig: Config) {
-
-  private val workerCount: Int = 1
-  private var _master: ActorRef = null
-  private var _system: ActorSystem = null
-  private var _config: Config = null
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  def start(): Unit = {
-    val port = Util.findFreePort().get
-    val akkaConf = getConfig(inputConfig, port)
-    _config = akkaConf
-    val system = ActorSystem(MASTER, akkaConf)
-
-    val master = system.actorOf(Props[MasterActor], MASTER)
-
-    0.until(workerCount).foreach { id =>
-      system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
-    }
-    this._master = master
-    this._system = system
-
-    LOG.info("=================================")
-    LOG.info("Local Cluster is started at: ")
-    LOG.info(s"                 127.0.0.1:$port")
-    LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
-  }
-
-  private def getConfig(inputConfig: Config, port: Int): Config = {
-    val config = inputConfig.
-      withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
-      withValue(GEARPUMP_CLUSTER_MASTERS,
-        ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
-      withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
-        ConfigValueFactory.fromAnyRef(true)).
-      withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
-      withValue("akka.actor.provider",
-        ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
-    config
-  }
-
-  def newClientContext: ClientContext = {
-    ClientContext(_config, _system, _master)
-  }
-
-  def stop(): Unit = {
-    _system.stop(_master)
-    _system.terminate()
-    Await.result(_system.whenTerminated, Duration.Inf)
-  }
-}
-
-object EmbeddedCluster {
-  def apply(): EmbeddedCluster = {
-    new EmbeddedCluster(ClusterConfig.master())
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
deleted file mode 100644
index db71b7b..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.{Master => MasterActor}
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
-
-object Local extends AkkaApp with ArgumentsParser {
-  override def akkaConfig: Config = ClusterConfig.master()
-
-  var LOG: Logger = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] =
-    Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)),
-      "workernum" -> CLIOption[Int]("<how many workers to start>", required = false,
-        defaultValue = Some(2)))
-
-  override val description = "Start a local cluster"
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-
-    this.LOG = {
-      LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL)
-      LogUtil.getLogger(getClass)
-    }
-
-    val config = parse(args)
-    if (null != config) {
-      local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
-    }
-  }
-
-  def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
-    if (sameProcess) {
-      LOG.info("Starting local in same process")
-      System.setProperty("LOCAL", "true")
-    }
-    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
-      .asScala.flatMap(Util.parseHostList)
-    val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
-
-    if (masters.size != 1 && masters.head.host != local) {
-      LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
-        s"with ${Constants.GEARPUMP_HOSTNAME}")
-    } else {
-
-      val hostPort = masters.head
-      implicit val system = ActorSystem(MASTER, akkaConf.
-        withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port))
-      )
-
-      val master = system.actorOf(Props[MasterActor], MASTER)
-      val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER"
-
-      0.until(workerCount).foreach { id =>
-        system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
-      }
-
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
deleted file mode 100644
index f1b9bdf..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor._
-import akka.cluster.ClusterEvent._
-import akka.cluster.ddata.DistributedData
-import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings}
-import akka.cluster.{Cluster, Member, MemberStatus}
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode}
-import org.apache.gearpump.cluster.master.Master.MasterListUpdated
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
-
-object Master extends AkkaApp with ArgumentsParser {
-
-  private var LOG: Logger = LogUtil.getLogger(getClass)
-
-  override def akkaConfig: Config = ClusterConfig.master()
-
-  override val options: Array[(String, CLIOption[Any])] =
-    Array("ip" -> CLIOption[String]("<master ip address>", required = true),
-      "port" -> CLIOption("<master port>", required = true))
-
-  override val description = "Start Master daemon"
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-
-    this.LOG = {
-      LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
-      LogUtil.getLogger(getClass)
-    }
-
-    val config = parse(args)
-    master(config.getString("ip"), config.getInt("port"), akkaConf)
-  }
-
-  private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = {
-    masters.exists { hostPort =>
-      hostPort == s"$master:$port"
-    }
-  }
-
-  private def master(ip: String, port: Int, akkaConf: Config): Unit = {
-    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
-
-    if (!verifyMaster(ip, port, masters)) {
-      LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " +
-        s"gearpump.cluster.masters: ${masters.mkString(", ")}")
-      System.exit(-1)
-    }
-
-    val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava
-    val quorum = masterList.size() / 2 + 1
-    val masterConfig = akkaConf.
-      withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
-      withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
-      withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)).
-      withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
-        ConfigValueFactory.fromAnyRef(quorum))
-
-    LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
-    val system = ActorSystem(MASTER, masterConfig)
-
-    val replicator = DistributedData(system).replicator
-    LOG.info(s"Replicator path: ${replicator.path}")
-
-    // Starts singleton manager
-    val singletonManager = system.actorOf(ClusterSingletonManager.props(
-      singletonProps = Props(classOf[MasterWatcher], MASTER),
-      terminationMessage = PoisonPill,
-      settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
-        .withRole(MASTER)),
-      name = SINGLETON_MANAGER)
-
-    // Start master proxy
-    val masterProxy = system.actorOf(ClusterSingletonProxy.props(
-      singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
-      // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
-      // Master is created when there is a majority of machines started.
-      settings = ClusterSingletonProxySettings(system)
-        .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
-      name = MASTER
-    )
-
-    LOG.info(s"master proxy is started at ${masterProxy.path}")
-
-    val mainThread = Thread.currentThread()
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run(): Unit = {
-        if (!system.whenTerminated.isCompleted) {
-          LOG.info("Triggering shutdown hook....")
-
-          system.stop(masterProxy)
-          val cluster = Cluster(system)
-          cluster.leave(cluster.selfAddress)
-          cluster.down(cluster.selfAddress)
-          try {
-            Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
-          } catch {
-            case ex: Exception => // Ignore
-          }
-          system.terminate()
-          mainThread.join()
-        }
-      }
-    })
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
-
-class MasterWatcher(role: String) extends Actor with ActorLogging {
-  import context.dispatcher
-
-  val cluster = Cluster(context.system)
-
-  val config = context.system.settings.config
-  val masters = config.getList("akka.cluster.seed-nodes")
-  val quorum = masters.size() / 2 + 1
-
-  val system = context.system
-
-  // Sorts by age, oldest first
-  val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) }
-  var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
-
-  def receive: Receive = null
-
-  // Subscribes to MemberEvent, re-subscribe when restart
-  override def preStart(): Unit = {
-    cluster.subscribe(self, classOf[MemberEvent])
-    context.become(waitForInit)
-  }
-  override def postStop(): Unit = {
-    cluster.unsubscribe(self)
-  }
-
-  def matchingRole(member: Member): Boolean = member.hasRole(role)
-
-  def waitForInit: Receive = {
-    case state: CurrentClusterState => {
-      membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m =>
-        m.status == MemberStatus.Up && matchingRole(m))
-
-      if (membersByAge.size < quorum) {
-        membersByAge.iterator.mkString(",")
-        log.info(s"We cannot get a quorum, $quorum, " +
-          s"shutting down...${membersByAge.iterator.mkString(",")}")
-        context.become(waitForShutdown)
-        self ! MasterWatcher.Shutdown
-      } else {
-        val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
-        notifyMasterMembersChange(master)
-        context.become(waitForClusterEvent(master))
-      }
-    }
-  }
-
-  def waitForClusterEvent(master: ActorRef): Receive = {
-    case MemberUp(m) if matchingRole(m) => {
-      membersByAge += m
-      notifyMasterMembersChange(master)
-    }
-    case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
-      mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
-      log.info(s"member removed ${mEvent.member}")
-      val m = mEvent.member
-      membersByAge -= m
-      if (membersByAge.size < quorum) {
-        log.info(s"We cannot get a quorum, $quorum, " +
-          s"shutting down...${membersByAge.iterator.mkString(",")}")
-        context.become(waitForShutdown)
-        self ! MasterWatcher.Shutdown
-      } else {
-        notifyMasterMembersChange(master)
-      }
-    }
-  }
-
-  private def notifyMasterMembersChange(master: ActorRef): Unit = {
-    val masters = membersByAge.toList.map{ member =>
-      MasterNode(member.address.host.getOrElse("Unknown-Host"),
-        member.address.port.getOrElse(0))
-    }
-    master ! MasterListUpdated(masters)
-  }
-
-  def waitForShutdown: Receive = {
-    case MasterWatcher.Shutdown => {
-      cluster.unsubscribe(self)
-      cluster.leave(cluster.selfAddress)
-      context.stop(self)
-      system.scheduler.scheduleOnce(Duration.Zero) {
-        try {
-          Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
-        } catch {
-          case ex: Exception => // Ignore
-        }
-        system.terminate()
-      }
-    }
-  }
-}
-
-object MasterWatcher {
-  object Shutdown
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
deleted file mode 100644
index 58a9dec..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to start a worker daemon process */
-object Worker extends AkkaApp with ArgumentsParser {
-  protected override def akkaConfig = ClusterConfig.worker()
-
-  override val description = "Start a worker daemon"
-
-  var LOG: Logger = LogUtil.getLogger(getClass)
-
-  private def uuid = java.util.UUID.randomUUID.toString
-
-  def main(akkaConf: Config, args: Array[String]): Unit = {
-    val id = uuid
-
-    this.LOG = {
-      LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
-      // Delay creation of LOG instance to avoid creating an empty log file as we
-      // reset the log file name here
-      LogUtil.getLogger(getClass)
-    }
-
-    val system = ActorSystem(id, akkaConf)
-
-    val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address =>
-      val hostAndPort = address.split(":")
-      HostPort(hostAndPort(0), hostAndPort(1).toInt)
-    }
-
-    LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...")
-    val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}")
-
-    system.actorOf(Props(classOf[WorkerActor], masterProxy),
-      classOf[WorkerActor].getSimpleName + id)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
\ No newline at end of file


[2/4] incubator-gearpump git commit: [GEARPUMP-224] merge gearpump-daemon to gearpump-core

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
deleted file mode 100644
index 9a3a119..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.util.{Failure, Success}
-
-import akka.actor._
-import akka.pattern.ask
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
-import org.apache.gearpump.cluster.AppMasterToWorker._
-import org.apache.gearpump.cluster.ClientToMaster._
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _}
-import org.apache.gearpump.cluster.MasterToClient._
-import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
-import org.apache.gearpump.cluster.master.AppManager._
-import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _}
-import org.apache.gearpump.cluster.master.Master._
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
-
-/**
- * AppManager is dedicated child of Master to manager all applications.
- */
-private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
-  extends Actor with Stash with TimeOutScheduler {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID
-  private val appMasterMaxRetries: Int = 5
-  private val appMasterRetryTimeRange: Duration = 20.seconds
-
-  implicit val timeout = FUTURE_TIMEOUT
-  implicit val executionContext = context.dispatcher
-
-  // Next available appId
-  private var nextAppId: Int = 1
-
-  // From appId to appMaster data
-  // Applications not in activeAppMasters or deadAppMasters are in pending status
-  private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
-
-  // Active appMaster list where applications are in active status
-  private var activeAppMasters = Set.empty[Int]
-
-  // Dead appMaster list where applications are in inactive status
-  private var deadAppMasters = Set.empty[Int]
-
-  private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
-
-  def receive: Receive = null
-
-  kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
-  context.become(waitForMasterState)
-
-  def waitForMasterState: Receive = {
-    case GetKVSuccess(_, result) =>
-      val masterState = result.asInstanceOf[MasterState]
-      if (masterState != null) {
-        this.nextAppId = masterState.maxId + 1
-        this.activeAppMasters = masterState.activeAppMasters
-        this.deadAppMasters = masterState.deadAppMasters
-        this.appMasterRegistry = masterState.appMasterRegistry
-      }
-      context.become(receiveHandler)
-      unstashAll()
-    case GetKVFailed(ex) =>
-      LOG.error("Failed to get master state, shutting down master to avoid data corruption...")
-      context.parent ! PoisonPill
-    case msg =>
-      LOG.info(s"Get message ${msg.getClass.getSimpleName}")
-      stash()
-  }
-
-  def receiveHandler: Receive = {
-    val msg = "Application Manager started. Ready for application submission..."
-    LOG.info(msg)
-    clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
-      appDataStoreService orElse terminationWatch
-  }
-
-  def clientMsgHandler: Receive = {
-    case SubmitApplication(app, jar, username) =>
-      LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...")
-      val client = sender()
-      if (applicationNameExist(app.name)) {
-        client ! SubmitApplicationResult(Failure(
-          new Exception(s"Application name ${app.name} already existed")))
-      } else {
-        context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, username, context.parent,
-          Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
-
-        val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, username, null)
-        appMasterRestartPolicies += nextAppId ->
-          new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
-        kvService ! PutKV(nextAppId.toString, APP_STATE, appState)
-        nextAppId += 1
-      }
-
-    case RestartApplication(appId) =>
-      val client = sender()
-      (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
-        case GetKVSuccess(_, result) =>
-          val appState = result.asInstanceOf[ApplicationState]
-          if (appState != null) {
-            LOG.info(s"Shutting down the application (restart), $appId")
-            self ! ShutdownApplication(appId)
-            self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client)
-          } else {
-            client ! SubmitApplicationResult(Failure(
-              new Exception(s"Failed to restart, because the application $appId does not exist.")
-            ))
-          }
-        case GetKVFailed(ex) =>
-          client ! SubmitApplicationResult(Failure(
-            new Exception(s"Unable to obtain the Master State. " +
-              s"Application $appId will not be restarted.")
-          ))
-      }
-
-    case ShutdownApplication(appId) =>
-      LOG.info(s"App Manager Shutting down application $appId")
-      val (_, appInfo) = appMasterRegistry.get(appId)
-        .filter { case (_, info) => !deadAppMasters.contains(info.appId)}
-        .getOrElse((null, null))
-      Option(appInfo) match {
-        case Some(info) =>
-          val worker = info.worker
-          val workerPath = Option(worker).map(_.path).orNull
-          LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId: $EXECUTOR_ID")
-          cleanApplicationData(appId)
-          val shutdown = ShutdownExecutor(appId, EXECUTOR_ID,
-            s"AppMaster $appId shutdown requested by master...")
-          sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
-          sender ! ShutdownApplicationResult(Success(appId))
-        case None =>
-          val errorMsg = s"Failed to find registration information for appId: $appId"
-          LOG.error(errorMsg)
-          sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
-      }
-
-    case ResolveAppId(appId) =>
-      val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
-      if (null != appMaster) {
-        sender ! ResolveAppIdResult(Success(appMaster))
-      } else {
-        sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId")))
-      }
-
-    case AppMastersDataRequest =>
-      var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
-      appMasterRegistry.foreach(pair => {
-        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
-        val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
-        val workerPath = Option(info.worker).map(worker =>
-          ActorUtil.getFullPath(context.system, worker.path))
-        val status = getAppMasterStatus(id)
-        appMastersData += AppMasterData(
-          status, id, info.appName, appMasterPath, workerPath.orNull,
-          info.submissionTime, info.startTime, info.finishTime, info.user)
-      })
-
-      sender ! AppMastersData(appMastersData.toList)
-
-    case QueryAppMasterConfig(appId) =>
-      val config =
-        if (appMasterRegistry.contains(appId)) {
-          val (_, info) = appMasterRegistry(appId)
-          info.config
-        } else {
-          null
-        }
-      sender ! AppMasterConfig(config)
-
-    case appMasterDataRequest: AppMasterDataRequest =>
-      val appId = appMasterDataRequest.appId
-      val appStatus = getAppMasterStatus(appId)
-
-      appStatus match {
-        case AppMasterNonExist =>
-          sender ! AppMasterData(AppMasterNonExist)
-        case _ =>
-          val (appMaster, info) = appMasterRegistry(appId)
-          val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
-          val workerPath = Option(info.worker).map(
-            worker => ActorUtil.getFullPath(context.system, worker.path)).orNull
-          sender ! AppMasterData(
-            appStatus, appId, info.appName, appMasterPath, workerPath,
-            info.submissionTime, info.startTime, info.finishTime, info.user)
-      }
-  }
-
-  def workerMessage: Receive = {
-    case ShutdownExecutorSucceed(appId, executorId) =>
-      LOG.info(s"Shut down executor $executorId for application $appId successfully")
-    case failed: ShutdownExecutorFailed =>
-      LOG.error(failed.reason)
-  }
-
-  private def getAppMasterStatus(appId: Int): AppMasterStatus = {
-    if (activeAppMasters.contains(appId)) {
-      AppMasterActive
-    } else if (deadAppMasters.contains(appId)) {
-      AppMasterInActive
-    } else if (appMasterRegistry.contains(appId)) {
-      AppMasterPending
-    } else {
-      AppMasterNonExist
-    }
-  }
-
-  private def shutDownExecutorTimeOut(): Unit = {
-    LOG.error(s"Shut down executor time out")
-  }
-
-  def appMasterMessage: Receive = {
-    case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
-      val startTime = System.currentTimeMillis()
-      val register = registerBack.copy(startTime = startTime)
-
-      LOG.info(s"Register AppMaster for app: ${register.appId}, $register")
-      context.watch(appMaster)
-      appMasterRegistry += register.appId -> (appMaster, register)
-      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-        MasterState(nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
-      sender ! AppMasterRegistered(register.appId)
-
-    case ActivateAppMaster(appId) =>
-      LOG.info(s"Activate AppMaster for app $appId")
-      activeAppMasters += appId
-      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-        MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
-      sender ! AppMasterActivated(appId)
-  }
-
-  def appDataStoreService: Receive = {
-    case SaveAppData(appId, key, value) =>
-      val client = sender()
-      (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
-        case PutKVSuccess =>
-          client ! AppDataSaved
-        case PutKVFailed(k, ex) =>
-          client ! SaveAppDataFailed
-      }
-    case GetAppData(appId, key) =>
-      val client = sender()
-      (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map {
-        case GetKVSuccess(privateKey, value) =>
-          client ! GetAppDataResult(key, value)
-        case GetKVFailed(ex) =>
-          client ! GetAppDataResult(key, null)
-      }
-  }
-
-  def terminationWatch: Receive = {
-    case terminate: Terminated =>
-      LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " +
-        s"network down: ${terminate.getAddressTerminated}")
-
-      // Now we assume that the only normal way to stop the application is submitting a
-      // ShutdownApplication request
-      val application = appMasterRegistry.find { appInfo =>
-        val (_, (actorRef, _)) = appInfo
-        actorRef.compareTo(terminate.actor) == 0
-      }
-      if (application.nonEmpty) {
-        val appId = application.get._1
-        (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
-          case GetKVSuccess(_, result) =>
-            val appState = result.asInstanceOf[ApplicationState]
-            if (appState != null) {
-              LOG.info(s"Recovering application, $appId")
-              self ! RecoverApplication(appState)
-            } else {
-              LOG.error(s"Cannot find application state for $appId")
-            }
-          case GetKVFailed(ex) =>
-            LOG.error(s"Cannot find master state to recover")
-        }
-      }
-  }
-
-  def selfMsgHandler: Receive = {
-    case RecoverApplication(state) =>
-      val appId = state.appId
-      if (appMasterRestartPolicies.get(appId).get.allowRestart) {
-        LOG.info(s"AppManager Recovering Application $appId...")
-        activeAppMasters -= appId
-        kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-          MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
-        context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, state.jar, state.username,
-          context.parent, None), s"launcher${appId}_${Util.randInt()}")
-      } else {
-        LOG.error(s"Application $appId failed too many times")
-      }
-  }
-
-  case class RecoverApplication(applicationStatus: ApplicationState)
-
-  private def cleanApplicationData(appId: Int): Unit = {
-    if (appMasterRegistry.contains(appId)) {
-      // Add the dead app to dead appMasters
-      deadAppMasters += appId
-      // Remove the dead app from active appMasters
-      activeAppMasters -= appId
-
-      appMasterRegistry += appId -> {
-        val (ref, info) = appMasterRegistry(appId)
-        (ref, info.copy(finishTime = System.currentTimeMillis()))
-      }
-      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-        MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
-      kvService ! DeleteKVGroup(appId.toString)
-    }
-  }
-
-  private def applicationNameExist(appName: String): Boolean = {
-    appMasterRegistry.values.exists { case (_, info) =>
-      info.appName == appName && !deadAppMasters.contains(info.appId)
-    }
-  }
-}
-
-object AppManager {
-  final val APP_STATE = "app_state"
-  // The id is used in KVStore
-  final val MASTER_STATE = "master_state"
-
-  case class MasterState(
-      maxId: Int,
-      appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
-      activeAppMasters: Set[Int],
-      deadAppMasters: Set[Int])
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
deleted file mode 100644
index 3e54214..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.TimeoutException
-import scala.concurrent.duration.Duration
-
-import akka.actor._
-import akka.cluster.Cluster
-import akka.cluster.ddata.Replicator._
-import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey}
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.LogUtil
-
-/**
- * A replicated simple in-memory KV service. The replications are stored on all masters.
- */
-class InMemoryKVService extends Actor with Stash {
-  import org.apache.gearpump.cluster.master.InMemoryKVService._
-
-  private val KV_SERVICE = "gearpump_kvservice"
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private val replicator = DistributedData(context.system).replicator
-  private implicit val cluster = Cluster(context.system)
-
-  // Optimize write path, we can tolerate one master down for recovery.
-  private val timeout = Duration(15, TimeUnit.SECONDS)
-  private val readMajority = ReadMajority(timeout)
-  private val writeMajority = WriteMajority(timeout)
-
-  private def groupKey(group: String): LWWMapKey[Any] = {
-    LWWMapKey[Any](KV_SERVICE + "_" + group)
-  }
-
-  def receive: Receive = kvService
-
-  def kvService: Receive = {
-
-    case GetKV(group: String, key: String) =>
-      val request = Request(sender(), key)
-      replicator ! Get(groupKey(group), readMajority, Some(request))
-    case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
-      val appData = success.get(group)
-      LOG.info(s"Successfully retrived group: ${group.id}")
-      request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
-    case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
-      LOG.info(s"We cannot find group $group")
-      request.client ! GetKVSuccess(request.key, null)
-    case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
-      val error = s"Failed to get application data, the request key is ${request.key}"
-      LOG.error(error)
-      request.client ! GetKVFailed(new Exception(error))
-
-    case PutKV(group: String, key: String, value: Any) =>
-      val request = Request(sender(), key)
-      val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map =>
-        map + (key -> value)
-      }
-      replicator ! update
-    case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
-      request.client ! PutKVSuccess
-    case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) =>
-      request.client ! PutKVFailed(request.key, new Exception(error, cause))
-    case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
-      request.client ! PutKVFailed(request.key, new TimeoutException())
-
-    case delete@DeleteKVGroup(group: String) =>
-      replicator ! Delete(groupKey(group), writeMajority)
-    case DeleteSuccess(group) =>
-      LOG.info(s"KV Group ${group.id} is deleted")
-    case ReplicationDeleteFailure(group) =>
-      LOG.error(s"Failed to delete KV Group ${group.id}...")
-    case DataDeleted(group) =>
-      LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
-  }
-}
-
-object InMemoryKVService {
-  /**
-   * KV Service related
-   */
-  case class GetKV(group: String, key: String)
-
-  trait GetKVResult
-
-  case class GetKVSuccess(key: String, value: Any) extends GetKVResult
-
-  case class GetKVFailed(ex: Throwable) extends GetKVResult
-
-  case class PutKV(group: String, key: String, value: Any)
-
-  case class DeleteKVGroup(group: String)
-
-  case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
-
-  trait PutKVResult
-
-  case object PutKVSuccess extends PutKVResult
-
-  case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
-
-  case class Request(client: ActorRef, key: String)
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
deleted file mode 100644
index 6b4df07..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import java.lang.management.ManagementFactory
-import org.apache.gearpump.cluster.worker.WorkerId
-import org.apache.gearpump.jarstore.JarStoreServer
-
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-
-import akka.actor._
-import akka.remote.DisassociatedEvent
-import com.typesafe.config.Config
-import org.apache.commons.lang.exception.ExceptionUtils
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.AppMasterToMaster._
-import org.apache.gearpump.cluster.ClientToMaster._
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.MasterToAppMaster._
-import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult}
-import org.apache.gearpump.cluster.MasterToWorker._
-import org.apache.gearpump.cluster.WorkerToMaster._
-import org.apache.gearpump.cluster.master.InMemoryKVService._
-import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
-import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import org.apache.gearpump.metrics.Metrics.ReportMetrics
-import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import org.apache.gearpump.util._
-
-/**
- * Master Actor who manages resources of the whole cluster.
- * It is like the resource manager of YARN.
- */
-private[cluster] class Master extends Actor with Stash {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private val systemConfig: Config = context.system.settings.config
-  private implicit val timeout = Constants.FUTURE_TIMEOUT
-  private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService")
-  // Resources and resourceRequests can be dynamically constructed by
-  // heartbeat of worker and appmaster when master singleton is migrated.
-  // We don't need to persist them in cluster
-  private var appManager: ActorRef = null
-
-  private var scheduler: ActorRef = null
-
-  private var workers = new immutable.HashMap[ActorRef, WorkerId]
-
-  private val birth = System.currentTimeMillis()
-
-  private var nextWorkerId = 0
-
-  def receive: Receive = null
-
-  // Register jvm metrics
-  Metrics(context.system).register(new JvmMetricsSet(s"master"))
-
-  LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...")
-
-  val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
-
-  private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath))
-
-  private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
-
-  // Maintain the list of active masters.
-  private var masters: List[MasterNode] = {
-    // Add myself into the list of initial masters.
-    List(MasterNode(hostPort.host, hostPort.port))
-  }
-
-  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
-
-  val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
-  val historyMetricsService = if (metricsEnabled) {
-    val historyMetricsService = {
-      context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig)))
-    }
-
-    val metricsReportService = context.actorOf(
-      Props(new MetricsReporterService(Metrics(context.system))))
-    historyMetricsService.tell(ReportMetrics, metricsReportService)
-    Some(historyMetricsService)
-  } else {
-    None
-  }
-
-  kvService ! GetKV(MASTER_GROUP, WORKER_ID)
-  context.become(waitForNextWorkerId)
-
-  def waitForNextWorkerId: Receive = {
-    case GetKVSuccess(_, result) =>
-      if (result != null) {
-        this.nextWorkerId = result.asInstanceOf[Int]
-      } else {
-        LOG.warn("Cannot find existing state in the distributed cluster...")
-      }
-      context.become(receiveHandler)
-      unstashAll()
-    case GetKVFailed(ex) =>
-      LOG.error("Failed to get worker id, shutting down master to avoid data corruption...")
-      context.parent ! PoisonPill
-    case msg =>
-      LOG.info(s"Get message ${msg.getClass.getSimpleName}")
-      stash()
-  }
-
-  def receiveHandler: Receive = workerMsgHandler orElse
-    appMasterMsgHandler orElse
-    onMasterListChange orElse
-    clientMsgHandler orElse
-    metricsService orElse
-    jarStoreService orElse
-    terminationWatch orElse
-    disassociated orElse
-    kvServiceMsgHandler orElse
-    ActorUtil.defaultMsgHandler(self)
-
-  def workerMsgHandler: Receive = {
-    case RegisterNewWorker =>
-      val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
-      nextWorkerId += 1
-      kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
-      val workerHostname = ActorUtil.getHostname(sender())
-      LOG.info(s"Register new from $workerHostname ....")
-      self forward RegisterWorker(workerId)
-
-    case RegisterWorker(id) =>
-      context.watch(sender())
-      sender ! WorkerRegistered(id, MasterInfo(self, birth))
-      scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
-      workers += (sender() -> id)
-      val workerHostname = ActorUtil.getHostname(sender())
-      LOG.info(s"Register Worker with id $id from $workerHostname ....")
-    case resourceUpdate: ResourceUpdate =>
-      scheduler forward resourceUpdate
-  }
-
-  def jarStoreService: Receive = {
-    case GetJarStoreServer =>
-      jarStore forward GetJarStoreServer
-  }
-
-  def kvServiceMsgHandler: Receive = {
-    case PutKVSuccess =>
-    // Skip
-    case PutKVFailed(key, exception) =>
-      LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
-        ExceptionUtils.getStackTrace(exception))
-  }
-
-  def metricsService: Receive = {
-    case query: QueryHistoryMetrics =>
-      if (historyMetricsService.isEmpty) {
-        // Returns empty metrics so that we don't hang the UI
-        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
-      } else {
-        historyMetricsService.get forward query
-      }
-  }
-
-  def appMasterMsgHandler: Receive = {
-    case request: RequestResource =>
-      scheduler forward request
-    case registerAppMaster: RegisterAppMaster =>
-      appManager forward registerAppMaster
-    case activateAppMaster: ActivateAppMaster =>
-      appManager forward activateAppMaster
-    case save: SaveAppData =>
-      appManager forward save
-    case get: GetAppData =>
-      appManager forward get
-    case GetAllWorkers =>
-      sender ! WorkerList(workers.values.toList)
-    case GetMasterData =>
-      val aliveFor = System.currentTimeMillis() - birth
-      val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
-      val userDir = System.getProperty("user.dir")
-
-      val masterDescription =
-        MasterSummary(
-          MasterNode(hostPort.host, hostPort.port),
-          masters,
-          aliveFor,
-          logFileDir,
-          jarStoreRootPath,
-          MasterStatus.Synced,
-          userDir,
-          List.empty[MasterActivity],
-          jvmName = ManagementFactory.getRuntimeMXBean().getName(),
-          historyMetricsConfig = getHistoryMetricsConfig
-        )
-
-      sender ! MasterData(masterDescription)
-
-    case invalidAppMaster: InvalidAppMaster =>
-      appManager forward invalidAppMaster
-  }
-
-  import scala.util.{Failure, Success}
-
-  def onMasterListChange: Receive = {
-    case MasterListUpdated(masters: List[MasterNode]) =>
-      this.masters = masters
-  }
-
-  def clientMsgHandler: Receive = {
-    case app: SubmitApplication =>
-      LOG.debug(s"Receive from client, SubmitApplication $app")
-      appManager.forward(app)
-    case app: RestartApplication =>
-      LOG.debug(s"Receive from client, RestartApplication $app")
-      appManager.forward(app)
-    case app: ShutdownApplication =>
-      LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
-      scheduler ! ApplicationFinished(app.appId)
-      appManager.forward(app)
-    case app: ResolveAppId =>
-      LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef")
-      appManager.forward(app)
-    case resolve: ResolveWorkerId =>
-      LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}")
-      val worker = workers.find(_._2 == resolve.workerId)
-      worker match {
-        case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
-        case None => sender ! ResolveWorkerIdResult(Failure(
-          new Exception(s"cannot find worker ${resolve.workerId}")))
-      }
-    case AppMastersDataRequest =>
-      LOG.debug("Master received AppMastersDataRequest")
-      appManager forward AppMastersDataRequest
-    case appMasterDataRequest: AppMasterDataRequest =>
-      LOG.debug("Master received AppMasterDataRequest")
-      appManager forward appMasterDataRequest
-    case query: QueryAppMasterConfig =>
-      LOG.debug("Master received QueryAppMasterConfig")
-      appManager forward query
-    case QueryMasterConfig =>
-      sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
-  }
-
-  def disassociated: Receive = {
-    case disassociated: DisassociatedEvent =>
-      LOG.info(s" disassociated ${disassociated.remoteAddress}")
-  }
-
-  def terminationWatch: Receive = {
-    case t: Terminated =>
-      val actor = t.actor
-      LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" +
-        t.getAddressTerminated())
-
-      LOG.info("Let's filter out dead resources...")
-      // Filters out dead worker resource
-      if (workers.keySet.contains(actor)) {
-        scheduler ! WorkerTerminated(workers.get(actor).get)
-        workers -= actor
-      }
-  }
-
-  override def preStart(): Unit = {
-    val path = ActorUtil.getFullPath(context.system, self.path)
-    LOG.info(s"master path is $path")
-    val schedulerClass = Class.forName(
-      systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
-
-    appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)),
-      classOf[AppManager].getSimpleName)
-    scheduler = context.actorOf(Props(schedulerClass))
-    context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
-  }
-}
-
-object Master {
-  final val MASTER_GROUP = "master_group"
-
-  final val WORKER_ID = "next_worker_id"
-
-  case class WorkerTerminated(workerId: WorkerId)
-
-  case class MasterInfo(master: ActorRef, startTime: Long = 0L)
-
-  /** Notify the subscriber that master actor list has been updated */
-  case class MasterListUpdated(masters: List[MasterNode])
-
-  object MasterInfo {
-    def empty: MasterInfo = MasterInfo(null)
-  }
-
-  case class SlotStatus(totalSlots: Int, availableSlots: Int)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
deleted file mode 100644
index 1429694..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.scheduler
-
-import org.apache.gearpump.cluster.worker.WorkerId
-
-import scala.collection.mutable
-
-import akka.actor.ActorRef
-
-import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
-import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import org.apache.gearpump.cluster.scheduler.Relaxation._
-import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest
-
-/** Assign resource to application based on the priority of the application */
-class PriorityScheduler extends Scheduler {
-  private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
-
-  def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] {
-    override def compare(x: PendingRequest, y: PendingRequest): Int = {
-      var res = x.request.priority.id - y.request.priority.id
-      if (res == 0) {
-        res = y.timeStamp.compareTo(x.timeStamp)
-      }
-      res
-    }
-  }
-
-  override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler
-
-  override def allocateResource(): Unit = {
-    var scheduleLater = Array.empty[PendingRequest]
-    val resourcesSnapShot = resources.clone()
-    var allocated = Resource.empty
-    val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
-
-    while (resourceRequests.nonEmpty && (allocated < totalResource)) {
-      val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue()
-      request.relaxation match {
-        case ANY =>
-          val allocations = allocateFairly(resourcesSnapShot, request)
-          val newAllocated = Resource(allocations.map(_.resource.slots).sum)
-          if (allocations.nonEmpty) {
-            appMaster ! ResourceAllocated(allocations.toArray)
-          }
-          if (newAllocated < request.resource) {
-            val remainingRequest = request.resource - newAllocated
-            val remainingExecutors = request.executorNum - allocations.length
-            val newResourceRequest = request.copy(resource = remainingRequest,
-              executorNum = remainingExecutors)
-            scheduleLater = scheduleLater :+
-              PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
-          }
-          allocated = allocated + newAllocated
-        case ONEWORKER =>
-          val availableResource = resourcesSnapShot.find { params =>
-            val (_, (_, resource)) = params
-            resource > request.resource
-          }
-          if (availableResource.nonEmpty) {
-            val (workerId, (worker, resource)) = availableResource.get
-            allocated = allocated + request.resource
-            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
-              workerId)))
-            resourcesSnapShot.update(workerId, (worker, resource - request.resource))
-          } else {
-            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
-          }
-        case SPECIFICWORKER =>
-          val workerAndResource = resourcesSnapShot.get(request.workerId)
-          if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
-            val (worker, availableResource) = workerAndResource.get
-            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
-              request.workerId)))
-            allocated = allocated + request.resource
-            resourcesSnapShot.update(request.workerId, (worker,
-              availableResource - request.resource))
-          } else {
-            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
-          }
-      }
-    }
-    for (request <- scheduleLater)
-      resourceRequests.enqueue(request)
-  }
-
-  def resourceRequestHandler: Receive = {
-    case RequestResource(appId, request) =>
-      LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " +
-        s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}")
-      val appMaster = sender()
-      resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
-        System.currentTimeMillis()))
-      allocateResource()
-  }
-
-  override def doneApplication(appId: Int): Unit = {
-    resourceRequests = resourceRequests.filter(_.appId != appId)
-  }
-
-  private def allocateFairly(
-      resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest)
-    : List[ResourceAllocation] = {
-    val workerNum = resources.size
-    var allocations = List.empty[ResourceAllocation]
-    var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
-    var remainingRequest = request.resource
-    var remainingExecutors = Math.min(request.executorNum, request.resource.slots)
-
-    while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
-      val exeutorNum = Math.min(workerNum, remainingExecutors)
-      val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
-
-      val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
-      val pickedResources = sortedResources.take(exeutorNum)
-
-      val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex =>
-        val ((workerId, (worker, resource)), index) = workerWithIndex
-        0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
-      }.sortBy(_._2).map(_._1)
-
-      if (flattenResource.length < toRequest.slots) {
-        // Can not safisfy the user's requirements
-        totalAvailable = Resource.empty
-      } else {
-        flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).
-          toArray.foreach { params =>
-          val ((workerId, worker), slots) = params
-          resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
-          allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
-        }
-        totalAvailable -= toRequest
-        remainingRequest -= toRequest
-        remainingExecutors -= exeutorNum
-      }
-    }
-    allocations
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
deleted file mode 100644
index 7187c1a..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.scheduler
-
-import org.apache.gearpump.cluster.worker.WorkerId
-
-import scala.collection.mutable
-
-import akka.actor.{Actor, ActorRef}
-import org.slf4j.Logger
-
-import org.apache.gearpump.TimeStamp
-import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
-import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
-import org.apache.gearpump.cluster.master.Master.WorkerTerminated
-import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Scheduler schedule resource for different applications.
- */
-abstract class Scheduler extends Actor {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-  protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
-
-  def handleScheduleMessage: Receive = {
-    case WorkerRegistered(id, _) =>
-      if (!resources.contains(id)) {
-        LOG.info(s"Worker $id added to the scheduler")
-        resources.put(id, (sender, Resource.empty))
-      }
-    case update@ResourceUpdate(worker, workerId, resource) =>
-      LOG.info(s"$update...")
-      if (resources.contains(workerId)) {
-        val resourceReturned = resource > resources.get(workerId).get._2
-        resources.update(workerId, (worker, resource))
-        if (resourceReturned) {
-          allocateResource()
-        }
-        sender ! UpdateResourceSucceed
-      }
-      else {
-        sender ! UpdateResourceFailed(
-          s"ResourceUpdate failed! The worker $workerId has not been registered into master")
-      }
-    case WorkerTerminated(workerId) =>
-      if (resources.contains(workerId)) {
-        resources -= workerId
-      }
-    case ApplicationFinished(appId) =>
-      doneApplication(appId)
-  }
-
-  def allocateResource(): Unit
-
-  def doneApplication(appId: Int): Unit
-}
-
-object Scheduler {
-  case class PendingRequest(
-      appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
-
-  case class ApplicationFinished(appId: Int)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
deleted file mode 100644
index b4e6f9e..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.worker
-
-import java.io.File
-
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.util.{LogUtil, RichProcess, Util}
-
-/** Launcher to start an executor process */
-class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  override def createProcess(
-      appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
-      classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
-
-    LOG.info(s"Launch executor $executorId, classpath: ${classPath.mkString(File.pathSeparator)}")
-    Util.startProcess(options, classPath, mainClass, arguments)
-  }
-
-  override def cleanProcess(appId: Int, executorId: Int): Unit = {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
deleted file mode 100644
index 1b52e5d..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
+++ /dev/null
@@ -1,581 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.worker
-
-import java.io.File
-import java.lang.management.ManagementFactory
-import java.net.URL
-import java.util.concurrent.{Executors, TimeUnit}
-import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher
-
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.SupervisorStrategy.Stop
-import akka.actor._
-import com.typesafe.config.{ConfigValueFactory, Config, ConfigFactory}
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import org.apache.gearpump.cluster.AppMasterToWorker._
-import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
-import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
-import org.apache.gearpump.cluster.MasterToWorker._
-import org.apache.gearpump.cluster.WorkerToAppMaster._
-import org.apache.gearpump.cluster.WorkerToMaster._
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
-import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
-import org.apache.gearpump.metrics.Metrics.ReportMetrics
-import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import org.apache.gearpump.util.ActorSystemBooter.Daemon
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import org.apache.gearpump.util.{TimeOutScheduler, _}
-
-/**
- * Worker is used to track the resource on single machine, it is like
- * the node manager of YARN.
- *
- * @param masterProxy masterProxy is used to resolve the master
- */
-private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
-  private val systemConfig: Config = context.system.settings.config
-
-  private val address = ActorUtil.getFullPath(context.system, self.path)
-  private var resource = Resource.empty
-  private var allocatedResources = Map[ActorRef, Resource]()
-  private var executorsInfo = Map[ActorRef, ExecutorSlots]()
-  private var id: WorkerId = WorkerId.unspecified
-  private val createdTime = System.currentTimeMillis()
-  private var masterInfo: MasterInfo = null
-  private var executorNameToActor = Map.empty[String, ActorRef]
-  private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
-  private val jarStoreClient = new JarStoreClient(systemConfig, context.system)
-
-  private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
-  private val resourceUpdateTimeoutMs = 30000 // Milliseconds
-
-  private var totalSlots: Int = 0
-
-  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
-  var historyMetricsService: Option[ActorRef] = None
-
-  override def receive: Receive = null
-  var LOG: Logger = LogUtil.getLogger(getClass)
-
-  def service: Receive =
-    appMasterMsgHandler orElse
-      clientMessageHandler orElse
-      metricsService orElse
-      terminationWatch(masterInfo.master) orElse
-      ActorUtil.defaultMsgHandler(self)
-
-  def metricsService: Receive = {
-    case query: QueryHistoryMetrics =>
-      if (historyMetricsService.isEmpty) {
-        // Returns empty metrics so that we don't hang the UI
-        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
-      } else {
-        historyMetricsService.get forward query
-      }
-  }
-
-  private var metricsInitialized = false
-
-  val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
-
-  private def initializeMetrics(): Unit = {
-    // Registers jvm metrics
-    val metricsSetName = "worker" + WorkerId.render(id)
-    Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
-
-    historyMetricsService = if (metricsEnabled) {
-      val historyMetricsService = {
-        context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
-      }
-
-      val metricsReportService = context.actorOf(Props(
-        new MetricsReporterService(Metrics(context.system))))
-      historyMetricsService.tell(ReportMetrics, metricsReportService)
-      Some(historyMetricsService)
-    } else {
-      None
-    }
-  }
-
-  def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
-
-    // If master get disconnected, the WorkerRegistered may be triggered multiple times.
-    case WorkerRegistered(id, masterInfo) =>
-      this.id = id
-
-      // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
-      // itself.
-      if (!metricsInitialized) {
-        initializeMetrics()
-        metricsInitialized = true
-      }
-
-      this.masterInfo = masterInfo
-      timeoutTicker.cancel()
-      context.watch(masterInfo.master)
-      this.LOG = LogUtil.getLogger(getClass, worker = id)
-      LOG.info(s"Worker is registered. " +
-        s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
-      sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
-        resourceUpdateTimeoutMs, updateResourceTimeOut())
-      context.become(service)
-  }
-
-  private def updateResourceTimeOut(): Unit = {
-    LOG.error(s"Update worker resource time out")
-  }
-
-  def appMasterMsgHandler: Receive = {
-    case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
-      val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
-      val executorToStop = executorNameToActor.get(actorName)
-      if (executorToStop.isDefined) {
-        LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
-          s"due to: $reason")
-        executorToStop.get.forward(shutdown)
-      } else {
-        LOG.error(s"Cannot find executor $actorName, ignore this message")
-        sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
-      }
-    case launch: LaunchExecutor =>
-      LOG.info(s"$launch")
-      if (resource < launch.resource) {
-        sender ! ExecutorLaunchRejected("There is no free resource on this machine")
-      } else {
-        val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
-
-        val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
-          jarStoreClient, executorProcLauncher))
-        executorNameToActor += actorName -> executor
-
-        resource = resource - launch.resource
-        allocatedResources = allocatedResources + (executor -> launch.resource)
-
-        reportResourceToMaster()
-        executorsInfo += executor ->
-          ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
-        context.watch(executor)
-      }
-    case UpdateResourceFailed(reason, ex) =>
-      LOG.error(reason)
-      context.stop(self)
-    case UpdateResourceSucceed =>
-      LOG.info(s"Update resource succeed")
-    case GetWorkerData(workerId) =>
-      val aliveFor = System.currentTimeMillis() - createdTime
-      val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
-      val userDir = System.getProperty("user.dir")
-      sender ! WorkerData(WorkerSummary(
-        id, "active",
-        address,
-        aliveFor,
-        logDir,
-        executorsInfo.values.toArray,
-        totalSlots,
-        resource.slots,
-        userDir,
-        jvmName = ManagementFactory.getRuntimeMXBean().getName(),
-        resourceManagerContainerId = systemConfig.getString(
-          GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
-        historyMetricsConfig = getHistoryMetricsConfig)
-      )
-    case ChangeExecutorResource(appId, executorId, usedResource) =>
-      for (executor <- executorActorRef(appId, executorId);
-        allocatedResource <- allocatedResources.get(executor)) {
-
-        allocatedResources += executor -> usedResource
-        resource = resource + allocatedResource - usedResource
-        reportResourceToMaster()
-
-        if (usedResource == Resource(0)) {
-          executorsInfo -= executor
-          allocatedResources -= executor
-          // stop executor if there is no resource binded to it.
-          LOG.info(s"Shutdown executor $executorId because the resource used is zero")
-          executor ! ShutdownExecutor(appId, executorId,
-            "Shutdown executor because the resource used is zero")
-        }
-      }
-  }
-
-  private def reportResourceToMaster(): Unit = {
-    sendMsgWithTimeOutCallBack(masterInfo.master,
-      ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
-  }
-
-  private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
-    val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
-    executorNameToActor.get(actorName)
-  }
-
-  def clientMessageHandler: Receive = {
-    case QueryWorkerConfig(workerId) =>
-      if (this.id == workerId) {
-        sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
-      } else {
-        sender ! WorkerConfig(ConfigFactory.empty)
-      }
-  }
-
-  private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
-    repeatActionUtil(
-      seconds = timeOutSeconds,
-      action = () => {
-        masterProxy ! RegisterWorker(workerId)
-      },
-      onTimeout = () => {
-        LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
-          s"seconds, abort and kill the worker...")
-        self ! PoisonPill
-      })
-  }
-
-  def terminationWatch(master: ActorRef): Receive = {
-    case Terminated(actor) =>
-      if (actor.compareTo(master) == 0) {
-        // Parent master is down, no point to keep worker anymore. Let's make suicide to free
-        // resources
-        LOG.info(s"Master cannot be contacted, find a new master ...")
-        context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
-      } else if (ActorUtil.isChildActorPath(self, actor)) {
-        // One executor is down,
-        LOG.info(s"Executor is down ${getExecutorName(actor)}")
-
-        val allocated = allocatedResources.get(actor)
-        if (allocated.isDefined) {
-          resource = resource + allocated.get
-          executorsInfo -= actor
-          allocatedResources = allocatedResources - actor
-          sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
-            resourceUpdateTimeoutMs, updateResourceTimeOut())
-        }
-      }
-  }
-
-  private def getExecutorName(actorRef: ActorRef): Option[String] = {
-    executorNameToActor.find(_._2 == actorRef).map(_._1)
-  }
-
-  private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
-    val launcherClazz = Class.forName(
-      systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
-    launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
-      .asInstanceOf[ExecutorProcessLauncher]
-  }
-
-  import context.dispatcher
-  override def preStart(): Unit = {
-    LOG.info(s"RegisterNewWorker")
-    totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
-    this.resource = Resource(totalSlots)
-    masterProxy ! RegisterNewWorker
-    context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
-  }
-
-  private def registerTimeoutTicker(seconds: Int): Cancellable = {
-    repeatActionUtil(seconds, () => Unit, () => {
-      LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
-        s"abort and kill the worker...")
-      self ! PoisonPill
-    })
-  }
-
-  private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
-    : Cancellable = {
-    val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
-      Duration(2, TimeUnit.SECONDS))(action())
-    val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
-    new Cancellable {
-      def cancel(): Boolean = {
-        val result1 = cancelTimeout.cancel()
-        val result2 = cancelSuicide.cancel()
-        result1 && result2
-      }
-
-      def isCancelled: Boolean = {
-        cancelTimeout.isCancelled && cancelSuicide.isCancelled
-      }
-    }
-  }
-
-  override def postStop(): Unit = {
-    LOG.info(s"Worker is going down....")
-    ioPool.shutdown()
-    context.system.terminate()
-  }
-}
-
-private[cluster] object Worker {
-
-  case class ExecutorResult(result: Try[Int])
-
-  class ExecutorWatcher(
-      launch: LaunchExecutor,
-      masterInfo: MasterInfo,
-      ioPool: ExecutionContext,
-      jarStoreClient: JarStoreClient,
-      procLauncher: ExecutorProcessLauncher) extends Actor {
-    import launch.{appId, executorId, resource}
-
-    private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
-
-    val executorConfig: Config = {
-      val workerConfig = context.system.settings.config
-
-      val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
-        Option(jvmConfig.executorAkkaConfig)
-      }.getOrElse(ConfigFactory.empty())
-
-      resolveExecutorConfig(workerConfig, submissionConfig)
-    }
-
-    // For some config, worker has priority, for others, user Application submission config
-    // have priorities.
-    private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
-      val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
-        .withoutPath(GEARPUMP_CLUSTER_MASTERS)
-        .withoutPath(GEARPUMP_HOME)
-        .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
-        .withoutPath(GEARPUMP_LOG_APPLICATION_DIR)
-        .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
-        // Falls back to workerConfig
-        .withFallback(workerConfig)
-
-      // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
-      val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
-      val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
-        LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
-        config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
-      } else {
-        config
-      }
-
-      // Excludes reference.conf, and JVM properties..
-      ClusterConfig.filterOutDefaultConfig(updatedConf)
-    }
-
-    implicit val executorService = ioPool
-
-    private val executorHandler = {
-      val ctx = launch.executorJvmConfig
-
-      if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
-        new ExecutorHandler {
-          val exitPromise = Promise[Int]()
-          val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
-
-          override def destroy(): Unit = {
-            context.stop(app)
-          }
-          override def exitValue: Future[Int] = {
-            exitPromise.future
-          }
-        }
-      } else {
-        createProcess(ctx)
-      }
-    }
-
-    private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
-
-      val process = Future {
-        val jarPath = ctx.jar.map { appJar =>
-          val tempFile = File.createTempFile(appJar.name, ".jar")
-          jarStoreClient.copyToLocalFile(tempFile, appJar.filePath)
-          val file = new URL("file:" + tempFile)
-          file.getFile
-        }
-
-        val configFile = {
-          val configFile = File.createTempFile("gearpump", ".conf")
-          ClusterConfig.saveConfig(executorConfig, configFile)
-          val file = new URL("file:" + configFile)
-          file.getFile
-        }
-
-        val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
-          ctx.classPath.map(path => expandEnviroment(path)) ++
-          jarPath.map(Array(_)).getOrElse(Array.empty[String])
-
-        val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
-        val logArgs = List(
-          s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
-          s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
-          s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
-          s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
-        val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
-
-        val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
-
-        // Remote debug executor process
-        val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
-        val remoteDebugConfig = if (remoteDebugFlag) {
-          val availablePort = Util.findFreePort().get
-          List(
-            "-Xdebug",
-            s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
-            s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
-          )
-        } else {
-          List.empty[String]
-        }
-
-        val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
-        val verboseGCConfig = if (verboseGCFlag) {
-          List(
-            s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
-            "-verbose:gc",
-            "-XX:+PrintGCDetails",
-            "-XX:+PrintGCDateStamps",
-            "-XX:+PrintTenuringDistribution",
-            "-XX:+PrintGCApplicationConcurrentTime",
-            "-XX:+PrintGCApplicationStoppedTime"
-          )
-        } else {
-          List.empty[String]
-        }
-
-        val ipv4 = List(s"-D${PREFER_IPV4}=true")
-
-        val options = ctx.jvmArguments ++ username ++
-          logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
-
-        val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
-          options, classPath, ctx.mainClass, ctx.arguments)
-
-        ProcessInfo(process, jarPath, configFile)
-      }
-
-      new ExecutorHandler {
-
-        var destroyed = false
-
-        override def destroy(): Unit = {
-          LOG.info(s"Destroy executor process ${ctx.mainClass}")
-          if (!destroyed) {
-            destroyed = true
-            process.foreach { info =>
-              info.process.destroy()
-              info.jarPath.foreach(new File(_).delete())
-              new File(info.configFile).delete()
-            }
-          }
-        }
-
-        override def exitValue: Future[Int] = {
-          process.flatMap { info =>
-            val exit = info.process.exitValue()
-            if (exit == 0) {
-              Future.successful(0)
-            } else {
-              Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
-              s"error summary: ${info.process.logger.error}"))
-            }
-          }
-        }
-      }
-    }
-
-    private def expandEnviroment(path: String): String = {
-      // TODO: extend this to support more environment.
-      path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
-    }
-
-    override def preStart(): Unit = {
-      executorHandler.exitValue.onComplete { value =>
-        procLauncher.cleanProcess(appId, executorId)
-        val result = ExecutorResult(value)
-        self ! result
-      }
-    }
-
-    override def postStop(): Unit = {
-      executorHandler.destroy()
-    }
-
-    // The folders are under ${GEARPUMP_HOME}
-    val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" +
-      File.separator + "yarn")
-
-    override def receive: Receive = {
-      case ShutdownExecutor(appId, executorId, reason: String) =>
-        executorHandler.destroy()
-        sender ! ShutdownExecutorSucceed(appId, executorId)
-        context.stop(self)
-      case ExecutorResult(executorResult) =>
-        executorResult match {
-          case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
-          case Failure(e) => LOG.error("Executor exit with errors", e)
-        }
-        context.stop(self)
-    }
-
-    private def getFormatedTime(timestamp: Long): String = {
-      val datePattern = "yyyy-MM-dd-HH-mm"
-      val format = new java.text.SimpleDateFormat(datePattern)
-      format.format(timestamp)
-    }
-
-    private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
-      classPath.filterNot(matchDaemonPattern(_))
-    }
-
-    private def matchDaemonPattern(path: String): Boolean = {
-      daemonPathPattern.exists(path.contains(_))
-    }
-  }
-
-  trait ExecutorHandler {
-    def destroy(): Unit
-    def exitValue: Future[Int]
-  }
-
-  case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
-
-  /**
-   * Starts the executor in  the same JVM as worker.
-   */
-  class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
-    extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
-    private val exitCode = 0
-
-    override val supervisorStrategy =
-      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
-        case ex: Throwable =>
-          LOG.error(s"system $name stopped ", ex)
-          exit.failure(ex)
-          Stop
-      }
-
-    override def postStop(): Unit = {
-      if (!exit.isCompleted) {
-        exit.success(exitCode)
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
deleted file mode 100644
index a6b75cb..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.pattern.ask
-import akka.testkit.TestActorRef
-import com.typesafe.config.ConfigValueFactory
-
-import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
-import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
-import org.apache.gearpump.cluster.master.Master
-import org.apache.gearpump.cluster.worker.Worker
-import org.apache.gearpump.util.Constants
-
-class MiniCluster {
-  private val mockMasterIP = "127.0.0.1"
-
-  implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
-    withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP)))
-
-  val (mockMaster, worker) = {
-    val master = system.actorOf(Props(classOf[Master]), "master")
-    val worker = system.actorOf(Props(classOf[Worker], master), "worker")
-
-    // Wait until worker register itself to master
-    waitUtilWorkerIsRegistered(master)
-    (master, worker)
-  }
-
-  def launchActor(props: Props): TestActorRef[Actor] = {
-    TestActorRef(props)
-  }
-
-  private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
-    while (!isWorkerRegistered(master)) {}
-  }
-
-  private def isWorkerRegistered(master: ActorRef): Boolean = {
-    import scala.concurrent.duration._
-    implicit val dispatcher = system.dispatcher
-
-    implicit val futureTimeout = Constants.FUTURE_TIMEOUT
-
-    val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]]
-
-    // Waits until the worker is registered.
-    val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
-    workers.workers.size > 0
-  }
-
-  def shutDown(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
deleted file mode 100644
index 90fdd39..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import java.util.Properties
-
-import akka.testkit.TestProbe
-import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
-import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.transport.HostPort
-
-import scala.concurrent.Future
-import scala.util.{Success, Try}
-
-import com.typesafe.config.{ConfigFactory, Config}
-import org.scalatest._
-
-import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _}
-import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult}
-import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.{Constants, LogUtil, Util}
-
-class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-
-  "Worker" should "register worker address to master when started." in {
-
-    val masterReceiver = createMockMaster()
-
-    val tempTestConf = convertTestConf(getHost, getPort)
-
-    val options = Array(
-      s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
-      s"-D${PREFER_IPV4}=true"
-    ) ++ getMasterListOption()
-
-    val worker = Util.startProcess(options,
-      getContextClassPath,
-      getMainClassName(Worker),
-      Array.empty)
-
-    try {
-      masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
-
-      tempTestConf.delete()
-    } finally {
-      worker.destroy()
-    }
-  }
-
-  "Master" should "accept worker RegisterNewWorker when started" in {
-    val worker = TestProbe()(getActorSystem)
-
-    val host = "127.0.0.1"
-    val port = Util.findFreePort().get
-
-    val properties = new Properties()
-    properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port")
-    properties.put(s"${GEARPUMP_HOSTNAME}", s"$host")
-    val masterConfig = ConfigFactory.parseProperties(properties)
-      .withFallback(TestUtil.MASTER_CONFIG)
-    Future {
-      Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString))
-    }
-
-    val masterProxy = getActorSystem.actorOf(
-      MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
-
-    worker.send(masterProxy, RegisterNewWorker)
-    worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
-  }
-
-  "Info" should "be started without exception" in {
-
-    val masterReceiver = createMockMaster()
-
-    Future {
-      org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
-    }
-
-    masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
-    masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName"))))
-  }
-
-  "Kill" should "be started without exception" in {
-
-    val masterReceiver = createMockMaster()
-
-    Future {
-      Kill.main(masterConfig, Array("-appid", "0"))
-    }
-
-    masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
-    masterReceiver.reply(ShutdownApplicationResult(Success(0)))
-  }
-
-  "Replay" should "be started without exception" in {
-
-    val masterReceiver = createMockMaster()
-
-    Future {
-      Replay.main(masterConfig, Array("-appid", "0"))
-    }
-
-    masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
-    masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
-    masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
-    masterReceiver.reply(ReplayApplicationResult(Success(0)))
-  }
-
-  "Local" should "be started without exception" in {
-    val port = Util.findFreePort().get
-    val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
-      s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
-      s"-D${PREFER_IPV4}=true")
-
-    val local = Util.startProcess(options,
-      getContextClassPath,
-      getMainClassName(Local),
-      Array.empty)
-
-    def retry(times: Int)(fn: => Boolean): Boolean = {
-
-      LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..")
-
-      val result = fn
-      if (result || times <= 0) {
-        result
-      } else {
-        Thread.sleep(1000)
-        retry(times - 1)(fn)
-      }
-    }
-
-    try {
-      assert(retry(10)(isPortUsed("127.0.0.1", port)),
-        "local is not started successfully, as port is not used " + port)
-    } finally {
-      local.destroy()
-    }
-  }
-
-  "Gear" should "support app|info|kill|shell|replay" in {
-
-    val commands = Array("app", "info", "kill", "shell", "replay")
-
-    assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
-
-    for (command <- commands) {
-      assert(Try(Gear.main(Array("-noexist"))).isFailure,
-        "pass unknown option, throw, command: " + command)
-    }
-
-    assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ")
-
-    val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
-    assert(tryThis.isFailure, "unknown command, throw")
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
deleted file mode 100644
index e1ba8f6..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.gearpump.cluster.TestUtil
-
-class MasterWatcherSpec extends FlatSpec with Matchers {
-  def config: Config = TestUtil.MASTER_CONFIG
-
-  "MasterWatcher" should "kill itself when can not get a quorum" in {
-    val system = ActorSystem("ForMasterWatcher", config)
-
-    val actorWatcher = TestProbe()(system)
-
-    val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
-    actorWatcher watch masterWatcher
-    actorWatcher.expectTerminated(masterWatcher, 5.seconds)
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
deleted file mode 100644
index 58e3593..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import scala.util.Success
-
-import akka.actor.{Actor, ActorRef, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
-import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
-import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
-import org.apache.gearpump.cluster.master.AppManager._
-import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess}
-import org.apache.gearpump.cluster.{TestUtil, _}
-import org.apache.gearpump.util.LogUtil
-
-class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-  var kvService: TestProbe = null
-  var haService: TestProbe = null
-  var appLauncher: TestProbe = null
-  var appManager: ActorRef = null
-  private val LOG = LogUtil.getLogger(getClass)
-
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-    kvService = TestProbe()(getActorSystem)
-    appLauncher = TestProbe()(getActorSystem)
-
-    appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
-      new DummyAppMasterLauncherFactory(appLauncher))))
-    kvService.expectMsgType[GetKV]
-    kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty)))
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-
-  "AppManager" should "handle AppMaster message correctly" in {
-    val appMaster = TestProbe()(getActorSystem)
-    val appId = 1
-
-    val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName"))
-    appMaster.send(appManager, register)
-    appMaster.expectMsgType[AppMasterRegistered]
-
-    appMaster.send(appManager, ActivateAppMaster(appId))
-    appMaster.expectMsgType[AppMasterActivated]
-  }
-
-  "DataStoreService" should "support Put and Get" in {
-    val appMaster = TestProbe()(getActorSystem)
-    appMaster.send(appManager, SaveAppData(0, "key", 1))
-    kvService.expectMsgType[PutKV]
-    kvService.reply(PutKVSuccess)
-    appMaster.expectMsg(AppDataSaved)
-
-    appMaster.send(appManager, GetAppData(0, "key"))
-    kvService.expectMsgType[GetKV]
-    kvService.reply(GetKVSuccess("key", 1))
-    appMaster.expectMsg(GetAppDataResult("key", 1))
-  }
-
-  "AppManager" should "support application submission and shutdown" in {
-    testClientSubmission(withRecover = false)
-  }
-
-  "AppManager" should "support application submission and recover if appmaster dies" in {
-    LOG.info("=================testing recover==============")
-    testClientSubmission(withRecover = true)
-  }
-
-  "AppManager" should "handle client message correctly" in {
-    val mockClient = TestProbe()(getActorSystem)
-    mockClient.send(appManager, ShutdownApplication(1))
-    assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
-
-    mockClient.send(appManager, ResolveAppId(1))
-    assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
-
-    mockClient.send(appManager, AppMasterDataRequest(1))
-    mockClient.expectMsg(AppMasterData(AppMasterNonExist))
-  }
-
-  "AppManager" should "reject the application submission if the app name already existed" in {
-    val app = TestUtil.dummyApp
-    val submit = SubmitApplication(app, None, "username")
-    val client = TestProbe()(getActorSystem)
-    val appMaster = TestProbe()(getActorSystem)
-    val worker = TestProbe()(getActorSystem)
-    val appId = 1
-
-    client.send(appManager, submit)
-
-    kvService.expectMsgType[PutKV]
-    appLauncher.expectMsg(LauncherStarted(appId))
-    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
-      AppMasterRuntimeInfo(appId, app.name)))
-    appMaster.expectMsgType[AppMasterRegistered]
-
-    client.send(appManager, submit)
-    assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
-  }
-
-  def testClientSubmission(withRecover: Boolean): Unit = {
-    val app = TestUtil.dummyApp
-    val submit = SubmitApplication(app, None, "username")
-    val client = TestProbe()(getActorSystem)
-    val appMaster = TestProbe()(getActorSystem)
-    val worker = TestProbe()(getActorSystem)
-    val appId = 1
-
-    client.send(appManager, submit)
-
-    kvService.expectMsgType[PutKV]
-    appLauncher.expectMsg(LauncherStarted(appId))
-    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
-      AppMasterRuntimeInfo(appId, app.name)))
-    kvService.expectMsgType[PutKV]
-    appMaster.expectMsgType[AppMasterRegistered]
-
-    client.send(appManager, ResolveAppId(appId))
-    client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
-
-    client.send(appManager, AppMastersDataRequest)
-    client.expectMsgType[AppMastersData]
-
-    client.send(appManager, AppMasterDataRequest(appId, false))
-    client.expectMsgType[AppMasterData]
-
-    if (!withRecover) {
-      client.send(appManager, ShutdownApplication(appId))
-      client.expectMsg(ShutdownApplicationResult(Success(appId)))
-    } else {
-      // Do recovery
-      getActorSystem.stop(appMaster.ref)
-      kvService.expectMsgType[GetKV]
-      val appState = ApplicationState(appId, "application1", 1, app, None, "username", null)
-      kvService.reply(GetKVSuccess(APP_STATE, appState))
-      appLauncher.expectMsg(LauncherStarted(appId))
-    }
-  }
-}
-
-class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory {
-
-  override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
-      username: String, master: ActorRef, client: Option[ActorRef]): Props = {
-    Props(new DummyAppMasterLauncher(test, appId))
-  }
-}
-
-class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
-
-  test.ref ! LauncherStarted(appId)
-  override def receive: Receive = {
-    case any: Any => test.ref forward any
-  }
-}
-
-case class LauncherStarted(appId: Int)



[4/4] incubator-gearpump git commit: [GEARPUMP-224] merge gearpump-daemon to gearpump-core

Posted by ma...@apache.org.
[GEARPUMP-224] merge gearpump-daemon to gearpump-core

Author: huafengw <fv...@gmail.com>

Closes #98 from huafengw/merge.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c3d5eb63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c3d5eb63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c3d5eb63

Branch: refs/heads/master
Commit: c3d5eb63f1d0c6c542268e21fe8356e042dfa232
Parents: a01809b
Author: huafengw <fv...@gmail.com>
Authored: Fri Oct 14 19:55:05 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Oct 14 19:55:05 2016 +0800

----------------------------------------------------------------------
 .../apache/gearpump/cluster/DaemonMessage.scala |  50 ++
 .../cluster/embedded/EmbeddedCluster.scala      |  95 +++
 .../apache/gearpump/cluster/main/Local.scala    |  89 +++
 .../apache/gearpump/cluster/main/Master.scala   | 236 ++++++++
 .../apache/gearpump/cluster/main/Worker.scala   |  70 +++
 .../gearpump/cluster/master/AppManager.scala    | 354 +++++++++++
 .../cluster/master/InMemoryKVService.scala      | 122 ++++
 .../apache/gearpump/cluster/master/Master.scala | 311 ++++++++++
 .../cluster/scheduler/PriorityScheduler.scala   | 154 +++++
 .../gearpump/cluster/scheduler/Scheduler.scala  |  77 +++
 .../worker/DefaultExecutorProcessLauncher.scala |  40 ++
 .../apache/gearpump/cluster/worker/Worker.scala | 579 ++++++++++++++++++
 .../apache/gearpump/cluster/MiniCluster.scala   |  73 +++
 .../cluster/appmaster/AppManagerSpec.scala      | 182 ++++++
 .../appmaster/InMemoryKVServiceSpec.scala       |  69 +++
 .../apache/gearpump/cluster/main/MainSpec.scala | 188 ++++++
 .../cluster/main/MasterWatcherSpec.scala        |  43 ++
 .../scheduler/PrioritySchedulerSpec.scala       | 230 ++++++++
 .../gearpump/cluster/worker/WorkerSpec.scala    | 128 ++++
 .../apache/gearpump/cluster/DaemonMessage.scala |  51 --
 .../cluster/embedded/EmbeddedCluster.scala      |  95 ---
 .../apache/gearpump/cluster/main/Local.scala    |  90 ---
 .../apache/gearpump/cluster/main/Master.scala   | 236 --------
 .../apache/gearpump/cluster/main/Worker.scala   |  71 ---
 .../gearpump/cluster/master/AppManager.scala    | 355 -----------
 .../cluster/master/InMemoryKVService.scala      | 122 ----
 .../apache/gearpump/cluster/master/Master.scala | 311 ----------
 .../cluster/scheduler/PriorityScheduler.scala   | 156 -----
 .../gearpump/cluster/scheduler/Scheduler.scala  |  79 ---
 .../worker/DefaultExecutorProcessLauncher.scala |  41 --
 .../apache/gearpump/cluster/worker/Worker.scala | 581 -------------------
 .../apache/gearpump/cluster/MiniCluster.scala   |  74 ---
 .../apache/gearpump/cluster/main/MainSpec.scala | 190 ------
 .../cluster/main/MasterWatcherSpec.scala        |  44 --
 .../cluster/master/AppManagerSpec.scala         | 184 ------
 .../cluster/master/InMemoryKVServiceSpec.scala  |  69 ---
 .../scheduler/PrioritySchedulerSpec.scala       | 232 --------
 .../gearpump/cluster/worker/WorkerSpec.scala    | 129 ----
 .../apache/gearpump/redis/RedisMessage.scala    | 148 +++--
 .../org/apache/gearpump/redis/RedisSink.scala   |  27 +-
 project/Build.scala                             |  41 +-
 project/BuildExample.scala                      |   8 +-
 project/Pack.scala                              |  14 +-
 43 files changed, 3227 insertions(+), 3211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
new file mode 100644
index 0000000..1e94132
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.ActorRef
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.WorkerId
+
+/**
+ * Cluster Bootup Flow
+ */
+object WorkerToMaster {
+
+  /** When an worker is started, it sends RegisterNewWorker */
+  case object RegisterNewWorker
+
+  /** When worker lose connection with master, it tries to register itself again with old Id. */
+  case class RegisterWorker(workerId: WorkerId)
+
+  /** Worker is responsible to broadcast its current status to master */
+  case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
+}
+
+object MasterToWorker {
+
+  /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
+  case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
+
+  /** Worker have not received reply from master */
+  case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
+
+  /** Master is synced with worker on resource slots managed by current worker */
+  case object UpdateResourceSucceed
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
new file mode 100644
index 0000000..9bde4d1
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.embedded
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import com.typesafe.config.{Config, ConfigValueFactory}
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.master.{Master => MasterActor}
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
+import org.apache.gearpump.util.{LogUtil, Util}
+
+/**
+ * Create a in-process cluster with single worker
+ */
+class EmbeddedCluster(inputConfig: Config) {
+
+  private val workerCount: Int = 1
+  private var _master: ActorRef = null
+  private var _system: ActorSystem = null
+  private var _config: Config = null
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  def start(): Unit = {
+    val port = Util.findFreePort().get
+    val akkaConf = getConfig(inputConfig, port)
+    _config = akkaConf
+    val system = ActorSystem(MASTER, akkaConf)
+
+    val master = system.actorOf(Props[MasterActor], MASTER)
+
+    0.until(workerCount).foreach { id =>
+      system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
+    }
+    this._master = master
+    this._system = system
+
+    LOG.info("=================================")
+    LOG.info("Local Cluster is started at: ")
+    LOG.info(s"                 127.0.0.1:$port")
+    LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+  }
+
+  private def getConfig(inputConfig: Config, port: Int): Config = {
+    val config = inputConfig.
+      withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
+      withValue(GEARPUMP_CLUSTER_MASTERS,
+        ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
+      withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
+        ConfigValueFactory.fromAnyRef(true)).
+      withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
+      withValue("akka.actor.provider",
+        ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
+    config
+  }
+
+  def newClientContext: ClientContext = {
+    ClientContext(_config, _system, _master)
+  }
+
+  def stop(): Unit = {
+    _system.stop(_master)
+    _system.terminate()
+    Await.result(_system.whenTerminated, Duration.Inf)
+  }
+}
+
+object EmbeddedCluster {
+  def apply(): EmbeddedCluster = {
+    new EmbeddedCluster(ClusterConfig.master())
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
new file mode 100644
index 0000000..db2cd8a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.{Master => MasterActor}
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+object Local extends AkkaApp with ArgumentsParser {
+  override def akkaConfig: Config = ClusterConfig.master()
+
+  var LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] =
+    Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)),
+      "workernum" -> CLIOption[Int]("<how many workers to start>", required = false,
+        defaultValue = Some(2)))
+
+  override val description = "Start a local cluster"
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    this.LOG = {
+      LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL)
+      LogUtil.getLogger(getClass)
+    }
+
+    val config = parse(args)
+    if (null != config) {
+      local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
+    }
+  }
+
+  def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
+    if (sameProcess) {
+      LOG.info("Starting local in same process")
+      System.setProperty("LOCAL", "true")
+    }
+    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+      .asScala.flatMap(Util.parseHostList)
+    val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
+
+    if (masters.size != 1 && masters.head.host != local) {
+      LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
+        s"with ${Constants.GEARPUMP_HOSTNAME}")
+    } else {
+
+      val hostPort = masters.head
+      implicit val system = ActorSystem(MASTER, akkaConf.
+        withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port))
+      )
+
+      val master = system.actorOf(Props[MasterActor], MASTER)
+      val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER"
+
+      0.until(workerCount).foreach { id =>
+        system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
+      }
+
+      Await.result(system.whenTerminated, Duration.Inf)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
new file mode 100644
index 0000000..f758720
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.cluster.ClusterEvent._
+import akka.cluster.{MemberStatus, Member, Cluster}
+import akka.cluster.ddata.DistributedData
+import akka.cluster.singleton.{ClusterSingletonProxySettings, ClusterSingletonProxy, ClusterSingletonManagerSettings, ClusterSingletonManager}
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.Master.MasterListUpdated
+import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+object Master extends AkkaApp with ArgumentsParser {
+
+  private var LOG: Logger = LogUtil.getLogger(getClass)
+
+  override def akkaConfig: Config = ClusterConfig.master()
+
+  override val options: Array[(String, CLIOption[Any])] =
+    Array("ip" -> CLIOption[String]("<master ip address>", required = true),
+      "port" -> CLIOption("<master port>", required = true))
+
+  override val description = "Start Master daemon"
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+
+    this.LOG = {
+      LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
+      LogUtil.getLogger(getClass)
+    }
+
+    val config = parse(args)
+    master(config.getString("ip"), config.getInt("port"), akkaConf)
+  }
+
+  private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = {
+    masters.exists { hostPort =>
+      hostPort == s"$master:$port"
+    }
+  }
+
+  private def master(ip: String, port: Int, akkaConf: Config): Unit = {
+    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+
+    if (!verifyMaster(ip, port, masters)) {
+      LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " +
+        s"gearpump.cluster.masters: ${masters.mkString(", ")}")
+      System.exit(-1)
+    }
+
+    val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava
+    val quorum = masterList.size() / 2 + 1
+    val masterConfig = akkaConf.
+      withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
+      withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
+      withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)).
+      withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
+        ConfigValueFactory.fromAnyRef(quorum))
+
+    LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
+    val system = ActorSystem(MASTER, masterConfig)
+
+    val replicator = DistributedData(system).replicator
+    LOG.info(s"Replicator path: ${replicator.path}")
+
+    // Starts singleton manager
+    val singletonManager = system.actorOf(ClusterSingletonManager.props(
+      singletonProps = Props(classOf[MasterWatcher], MASTER),
+      terminationMessage = PoisonPill,
+      settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
+        .withRole(MASTER)),
+      name = SINGLETON_MANAGER)
+
+    // Start master proxy
+    val masterProxy = system.actorOf(ClusterSingletonProxy.props(
+      singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
+      // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
+      // Master is created when there is a majority of machines started.
+      settings = ClusterSingletonProxySettings(system)
+        .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
+      name = MASTER
+    )
+
+    LOG.info(s"master proxy is started at ${masterProxy.path}")
+
+    val mainThread = Thread.currentThread()
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run(): Unit = {
+        if (!system.whenTerminated.isCompleted) {
+          LOG.info("Triggering shutdown hook....")
+
+          system.stop(masterProxy)
+          val cluster = Cluster(system)
+          cluster.leave(cluster.selfAddress)
+          cluster.down(cluster.selfAddress)
+          try {
+            Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
+          } catch {
+            case ex: Exception => // Ignore
+          }
+          system.terminate()
+          mainThread.join()
+        }
+      }
+    })
+
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}
+
+class MasterWatcher(role: String) extends Actor with ActorLogging {
+  import context.dispatcher
+
+  val cluster = Cluster(context.system)
+
+  val config = context.system.settings.config
+  val masters = config.getList("akka.cluster.seed-nodes")
+  val quorum = masters.size() / 2 + 1
+
+  val system = context.system
+
+  // Sorts by age, oldest first
+  val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) }
+  var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
+
+  def receive: Receive = null
+
+  // Subscribes to MemberEvent, re-subscribe when restart
+  override def preStart(): Unit = {
+    cluster.subscribe(self, classOf[MemberEvent])
+    context.become(waitForInit)
+  }
+  override def postStop(): Unit = {
+    cluster.unsubscribe(self)
+  }
+
+  def matchingRole(member: Member): Boolean = member.hasRole(role)
+
+  def waitForInit: Receive = {
+    case state: CurrentClusterState => {
+      membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m =>
+        m.status == MemberStatus.Up && matchingRole(m))
+
+      if (membersByAge.size < quorum) {
+        membersByAge.iterator.mkString(",")
+        log.info(s"We cannot get a quorum, $quorum, " +
+          s"shutting down...${membersByAge.iterator.mkString(",")}")
+        context.become(waitForShutdown)
+        self ! MasterWatcher.Shutdown
+      } else {
+        val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
+        notifyMasterMembersChange(master)
+        context.become(waitForClusterEvent(master))
+      }
+    }
+  }
+
+  def waitForClusterEvent(master: ActorRef): Receive = {
+    case MemberUp(m) if matchingRole(m) => {
+      membersByAge += m
+      notifyMasterMembersChange(master)
+    }
+    case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
+      mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
+      log.info(s"member removed ${mEvent.member}")
+      val m = mEvent.member
+      membersByAge -= m
+      if (membersByAge.size < quorum) {
+        log.info(s"We cannot get a quorum, $quorum, " +
+          s"shutting down...${membersByAge.iterator.mkString(",")}")
+        context.become(waitForShutdown)
+        self ! MasterWatcher.Shutdown
+      } else {
+        notifyMasterMembersChange(master)
+      }
+    }
+  }
+
+  private def notifyMasterMembersChange(master: ActorRef): Unit = {
+    val masters = membersByAge.toList.map{ member =>
+      MasterNode(member.address.host.getOrElse("Unknown-Host"),
+        member.address.port.getOrElse(0))
+    }
+    master ! MasterListUpdated(masters)
+  }
+
+  def waitForShutdown: Receive = {
+    case MasterWatcher.Shutdown => {
+      cluster.unsubscribe(self)
+      cluster.leave(cluster.selfAddress)
+      context.stop(self)
+      system.scheduler.scheduleOnce(Duration.Zero) {
+        try {
+          Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
+        } catch {
+          case ex: Exception => // Ignore
+        }
+        system.terminate()
+      }
+    }
+  }
+}
+
+object MasterWatcher {
+  object Shutdown
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
new file mode 100644
index 0000000..3d8d823
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, LogUtil}
+import org.slf4j.Logger
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+/** Tool to start a worker daemon process */
+object Worker extends AkkaApp with ArgumentsParser {
+  protected override def akkaConfig = ClusterConfig.worker()
+
+  override val description = "Start a worker daemon"
+
+  var LOG: Logger = LogUtil.getLogger(getClass)
+
+  private def uuid = java.util.UUID.randomUUID.toString
+
+  def main(akkaConf: Config, args: Array[String]): Unit = {
+    val id = uuid
+
+    this.LOG = {
+      LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
+      // Delay creation of LOG instance to avoid creating an empty log file as we
+      // reset the log file name here
+      LogUtil.getLogger(getClass)
+    }
+
+    val system = ActorSystem(id, akkaConf)
+
+    val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address =>
+      val hostAndPort = address.split(":")
+      HostPort(hostAndPort(0), hostAndPort(1).toInt)
+    }
+
+    LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...")
+    val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}")
+
+    system.actorOf(Props(classOf[WorkerActor], masterProxy),
+      classOf[WorkerActor].getSimpleName + id)
+
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
new file mode 100644
index 0000000..0ae7365
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import akka.actor._
+import akka.pattern.ask
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _}
+import org.apache.gearpump.cluster.master.Master._
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
+import org.slf4j.Logger
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+
+/**
+ * AppManager is dedicated child of Master to manager all applications.
+ */
+private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
+  extends Actor with Stash with TimeOutScheduler {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID
+  private val appMasterMaxRetries: Int = 5
+  private val appMasterRetryTimeRange: Duration = 20.seconds
+
+  implicit val timeout = FUTURE_TIMEOUT
+  implicit val executionContext = context.dispatcher
+
+  // Next available appId
+  private var nextAppId: Int = 1
+
+  // From appId to appMaster data
+  // Applications not in activeAppMasters or deadAppMasters are in pending status
+  private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
+
+  // Active appMaster list where applications are in active status
+  private var activeAppMasters = Set.empty[Int]
+
+  // Dead appMaster list where applications are in inactive status
+  private var deadAppMasters = Set.empty[Int]
+
+  private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
+
+  def receive: Receive = null
+
+  kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
+  context.become(waitForMasterState)
+
+  def waitForMasterState: Receive = {
+    case GetKVSuccess(_, result) =>
+      val masterState = result.asInstanceOf[MasterState]
+      if (masterState != null) {
+        this.nextAppId = masterState.maxId + 1
+        this.activeAppMasters = masterState.activeAppMasters
+        this.deadAppMasters = masterState.deadAppMasters
+        this.appMasterRegistry = masterState.appMasterRegistry
+      }
+      context.become(receiveHandler)
+      unstashAll()
+    case GetKVFailed(ex) =>
+      LOG.error("Failed to get master state, shutting down master to avoid data corruption...")
+      context.parent ! PoisonPill
+    case msg =>
+      LOG.info(s"Get message ${msg.getClass.getSimpleName}")
+      stash()
+  }
+
+  def receiveHandler: Receive = {
+    val msg = "Application Manager started. Ready for application submission..."
+    LOG.info(msg)
+    clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
+      appDataStoreService orElse terminationWatch
+  }
+
+  def clientMsgHandler: Receive = {
+    case SubmitApplication(app, jar, username) =>
+      LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...")
+      val client = sender()
+      if (applicationNameExist(app.name)) {
+        client ! SubmitApplicationResult(Failure(
+          new Exception(s"Application name ${app.name} already existed")))
+      } else {
+        context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, username, context.parent,
+          Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
+
+        val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, username, null)
+        appMasterRestartPolicies += nextAppId ->
+          new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
+        kvService ! PutKV(nextAppId.toString, APP_STATE, appState)
+        nextAppId += 1
+      }
+
+    case RestartApplication(appId) =>
+      val client = sender()
+      (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+        case GetKVSuccess(_, result) =>
+          val appState = result.asInstanceOf[ApplicationState]
+          if (appState != null) {
+            LOG.info(s"Shutting down the application (restart), $appId")
+            self ! ShutdownApplication(appId)
+            self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client)
+          } else {
+            client ! SubmitApplicationResult(Failure(
+              new Exception(s"Failed to restart, because the application $appId does not exist.")
+            ))
+          }
+        case GetKVFailed(ex) =>
+          client ! SubmitApplicationResult(Failure(
+            new Exception(s"Unable to obtain the Master State. " +
+              s"Application $appId will not be restarted.")
+          ))
+      }
+
+    case ShutdownApplication(appId) =>
+      LOG.info(s"App Manager Shutting down application $appId")
+      val (_, appInfo) = appMasterRegistry.get(appId)
+        .filter { case (_, info) => !deadAppMasters.contains(info.appId)}
+        .getOrElse((null, null))
+      Option(appInfo) match {
+        case Some(info) =>
+          val worker = info.worker
+          val workerPath = Option(worker).map(_.path).orNull
+          LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId: $EXECUTOR_ID")
+          cleanApplicationData(appId)
+          val shutdown = ShutdownExecutor(appId, EXECUTOR_ID,
+            s"AppMaster $appId shutdown requested by master...")
+          sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
+          sender ! ShutdownApplicationResult(Success(appId))
+        case None =>
+          val errorMsg = s"Failed to find registration information for appId: $appId"
+          LOG.error(errorMsg)
+          sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
+      }
+
+    case ResolveAppId(appId) =>
+      val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
+      if (null != appMaster) {
+        sender ! ResolveAppIdResult(Success(appMaster))
+      } else {
+        sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId")))
+      }
+
+    case AppMastersDataRequest =>
+      var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
+      appMasterRegistry.foreach(pair => {
+        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
+        val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
+        val workerPath = Option(info.worker).map(worker =>
+          ActorUtil.getFullPath(context.system, worker.path))
+        val status = getAppMasterStatus(id)
+        appMastersData += AppMasterData(
+          status, id, info.appName, appMasterPath, workerPath.orNull,
+          info.submissionTime, info.startTime, info.finishTime, info.user)
+      })
+
+      sender ! AppMastersData(appMastersData.toList)
+
+    case QueryAppMasterConfig(appId) =>
+      val config =
+        if (appMasterRegistry.contains(appId)) {
+          val (_, info) = appMasterRegistry(appId)
+          info.config
+        } else {
+          null
+        }
+      sender ! AppMasterConfig(config)
+
+    case appMasterDataRequest: AppMasterDataRequest =>
+      val appId = appMasterDataRequest.appId
+      val appStatus = getAppMasterStatus(appId)
+
+      appStatus match {
+        case AppMasterNonExist =>
+          sender ! AppMasterData(AppMasterNonExist)
+        case _ =>
+          val (appMaster, info) = appMasterRegistry(appId)
+          val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
+          val workerPath = Option(info.worker).map(
+            worker => ActorUtil.getFullPath(context.system, worker.path)).orNull
+          sender ! AppMasterData(
+            appStatus, appId, info.appName, appMasterPath, workerPath,
+            info.submissionTime, info.startTime, info.finishTime, info.user)
+      }
+  }
+
+  def workerMessage: Receive = {
+    case ShutdownExecutorSucceed(appId, executorId) =>
+      LOG.info(s"Shut down executor $executorId for application $appId successfully")
+    case failed: ShutdownExecutorFailed =>
+      LOG.error(failed.reason)
+  }
+
+  private def getAppMasterStatus(appId: Int): AppMasterStatus = {
+    if (activeAppMasters.contains(appId)) {
+      AppMasterActive
+    } else if (deadAppMasters.contains(appId)) {
+      AppMasterInActive
+    } else if (appMasterRegistry.contains(appId)) {
+      AppMasterPending
+    } else {
+      AppMasterNonExist
+    }
+  }
+
+  private def shutDownExecutorTimeOut(): Unit = {
+    LOG.error(s"Shut down executor time out")
+  }
+
+  def appMasterMessage: Receive = {
+    case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
+      val startTime = System.currentTimeMillis()
+      val register = registerBack.copy(startTime = startTime)
+
+      LOG.info(s"Register AppMaster for app: ${register.appId}, $register")
+      context.watch(appMaster)
+      appMasterRegistry += register.appId -> (appMaster, register)
+      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+        MasterState(nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
+      sender ! AppMasterRegistered(register.appId)
+
+    case ActivateAppMaster(appId) =>
+      LOG.info(s"Activate AppMaster for app $appId")
+      activeAppMasters += appId
+      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+        MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
+      sender ! AppMasterActivated(appId)
+  }
+
+  def appDataStoreService: Receive = {
+    case SaveAppData(appId, key, value) =>
+      val client = sender()
+      (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
+        case PutKVSuccess =>
+          client ! AppDataSaved
+        case PutKVFailed(k, ex) =>
+          client ! SaveAppDataFailed
+      }
+    case GetAppData(appId, key) =>
+      val client = sender()
+      (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map {
+        case GetKVSuccess(privateKey, value) =>
+          client ! GetAppDataResult(key, value)
+        case GetKVFailed(ex) =>
+          client ! GetAppDataResult(key, null)
+      }
+  }
+
+  def terminationWatch: Receive = {
+    case terminate: Terminated =>
+      LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " +
+        s"network down: ${terminate.getAddressTerminated}")
+
+      // Now we assume that the only normal way to stop the application is submitting a
+      // ShutdownApplication request
+      val application = appMasterRegistry.find { appInfo =>
+        val (_, (actorRef, _)) = appInfo
+        actorRef.compareTo(terminate.actor) == 0
+      }
+      if (application.nonEmpty) {
+        val appId = application.get._1
+        (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+          case GetKVSuccess(_, result) =>
+            val appState = result.asInstanceOf[ApplicationState]
+            if (appState != null) {
+              LOG.info(s"Recovering application, $appId")
+              self ! RecoverApplication(appState)
+            } else {
+              LOG.error(s"Cannot find application state for $appId")
+            }
+          case GetKVFailed(ex) =>
+            LOG.error(s"Cannot find master state to recover")
+        }
+      }
+  }
+
+  def selfMsgHandler: Receive = {
+    case RecoverApplication(state) =>
+      val appId = state.appId
+      if (appMasterRestartPolicies.get(appId).get.allowRestart) {
+        LOG.info(s"AppManager Recovering Application $appId...")
+        activeAppMasters -= appId
+        kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+          MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
+        context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, state.jar, state.username,
+          context.parent, None), s"launcher${appId}_${Util.randInt()}")
+      } else {
+        LOG.error(s"Application $appId failed too many times")
+      }
+  }
+
+  case class RecoverApplication(applicationStatus: ApplicationState)
+
+  private def cleanApplicationData(appId: Int): Unit = {
+    if (appMasterRegistry.contains(appId)) {
+      // Add the dead app to dead appMasters
+      deadAppMasters += appId
+      // Remove the dead app from active appMasters
+      activeAppMasters -= appId
+
+      appMasterRegistry += appId -> {
+        val (ref, info) = appMasterRegistry(appId)
+        (ref, info.copy(finishTime = System.currentTimeMillis()))
+      }
+      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+        MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
+      kvService ! DeleteKVGroup(appId.toString)
+    }
+  }
+
+  private def applicationNameExist(appName: String): Boolean = {
+    appMasterRegistry.values.exists { case (_, info) =>
+      info.appName == appName && !deadAppMasters.contains(info.appId)
+    }
+  }
+}
+
+object AppManager {
+  final val APP_STATE = "app_state"
+  // The id is used in KVStore
+  final val MASTER_STATE = "master_state"
+
+  case class MasterState(
+      maxId: Int,
+      appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
+      activeAppMasters: Set[Int],
+      deadAppMasters: Set[Int])
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
new file mode 100644
index 0000000..fd19bad
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import akka.cluster.Cluster
+import akka.cluster.ddata.{LWWMap, LWWMapKey, DistributedData}
+import akka.cluster.ddata.Replicator._
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.Duration
+
+/**
+ * A replicated simple in-memory KV service. The replications are stored on all masters.
+ */
+class InMemoryKVService extends Actor with Stash {
+  import org.apache.gearpump.cluster.master.InMemoryKVService._
+
+  private val KV_SERVICE = "gearpump_kvservice"
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private val replicator = DistributedData(context.system).replicator
+  private implicit val cluster = Cluster(context.system)
+
+  // Optimize write path, we can tolerate one master down for recovery.
+  private val timeout = Duration(15, TimeUnit.SECONDS)
+  private val readMajority = ReadMajority(timeout)
+  private val writeMajority = WriteMajority(timeout)
+
+  private def groupKey(group: String): LWWMapKey[Any] = {
+    LWWMapKey[Any](KV_SERVICE + "_" + group)
+  }
+
+  def receive: Receive = kvService
+
+  def kvService: Receive = {
+
+    case GetKV(group: String, key: String) =>
+      val request = Request(sender(), key)
+      replicator ! Get(groupKey(group), readMajority, Some(request))
+    case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      val appData = success.get(group)
+      LOG.info(s"Successfully retrived group: ${group.id}")
+      request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
+    case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      LOG.info(s"We cannot find group $group")
+      request.client ! GetKVSuccess(request.key, null)
+    case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      val error = s"Failed to get application data, the request key is ${request.key}"
+      LOG.error(error)
+      request.client ! GetKVFailed(new Exception(error))
+
+    case PutKV(group: String, key: String, value: Any) =>
+      val request = Request(sender(), key)
+      val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map =>
+        map + (key -> value)
+      }
+      replicator ! update
+    case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      request.client ! PutKVSuccess
+    case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) =>
+      request.client ! PutKVFailed(request.key, new Exception(error, cause))
+    case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      request.client ! PutKVFailed(request.key, new TimeoutException())
+
+    case delete@DeleteKVGroup(group: String) =>
+      replicator ! Delete(groupKey(group), writeMajority)
+    case DeleteSuccess(group) =>
+      LOG.info(s"KV Group ${group.id} is deleted")
+    case ReplicationDeleteFailure(group) =>
+      LOG.error(s"Failed to delete KV Group ${group.id}...")
+    case DataDeleted(group) =>
+      LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
+  }
+}
+
+object InMemoryKVService {
+  /**
+   * KV Service related
+   */
+  case class GetKV(group: String, key: String)
+
+  trait GetKVResult
+
+  case class GetKVSuccess(key: String, value: Any) extends GetKVResult
+
+  case class GetKVFailed(ex: Throwable) extends GetKVResult
+
+  case class PutKV(group: String, key: String, value: Any)
+
+  case class DeleteKVGroup(group: String)
+
+  case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
+
+  trait PutKVResult
+
+  case object PutKVSuccess extends PutKVResult
+
+  case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
+
+  case class Request(client: ActorRef, key: String)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
new file mode 100644
index 0000000..6b4df07
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.lang.management.ManagementFactory
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.jarstore.JarStoreServer
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+
+import akka.actor._
+import akka.remote.DisassociatedEvent
+import com.typesafe.config.Config
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster._
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.MasterToAppMaster._
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult}
+import org.apache.gearpump.cluster.MasterToWorker._
+import org.apache.gearpump.cluster.WorkerToMaster._
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util._
+
+/**
+ * Master Actor who manages resources of the whole cluster.
+ * It is like the resource manager of YARN.
+ */
+private[cluster] class Master extends Actor with Stash {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private val systemConfig: Config = context.system.settings.config
+  private implicit val timeout = Constants.FUTURE_TIMEOUT
+  private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService")
+  // Resources and resourceRequests can be dynamically constructed by
+  // heartbeat of worker and appmaster when master singleton is migrated.
+  // We don't need to persist them in cluster
+  private var appManager: ActorRef = null
+
+  private var scheduler: ActorRef = null
+
+  private var workers = new immutable.HashMap[ActorRef, WorkerId]
+
+  private val birth = System.currentTimeMillis()
+
+  private var nextWorkerId = 0
+
+  def receive: Receive = null
+
+  // Register jvm metrics
+  Metrics(context.system).register(new JvmMetricsSet(s"master"))
+
+  LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...")
+
+  val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
+
+  private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath))
+
+  private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
+
+  // Maintain the list of active masters.
+  private var masters: List[MasterNode] = {
+    // Add myself into the list of initial masters.
+    List(MasterNode(hostPort.host, hostPort.port))
+  }
+
+  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+
+  val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+  val historyMetricsService = if (metricsEnabled) {
+    val historyMetricsService = {
+      context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig)))
+    }
+
+    val metricsReportService = context.actorOf(
+      Props(new MetricsReporterService(Metrics(context.system))))
+    historyMetricsService.tell(ReportMetrics, metricsReportService)
+    Some(historyMetricsService)
+  } else {
+    None
+  }
+
+  kvService ! GetKV(MASTER_GROUP, WORKER_ID)
+  context.become(waitForNextWorkerId)
+
+  def waitForNextWorkerId: Receive = {
+    case GetKVSuccess(_, result) =>
+      if (result != null) {
+        this.nextWorkerId = result.asInstanceOf[Int]
+      } else {
+        LOG.warn("Cannot find existing state in the distributed cluster...")
+      }
+      context.become(receiveHandler)
+      unstashAll()
+    case GetKVFailed(ex) =>
+      LOG.error("Failed to get worker id, shutting down master to avoid data corruption...")
+      context.parent ! PoisonPill
+    case msg =>
+      LOG.info(s"Get message ${msg.getClass.getSimpleName}")
+      stash()
+  }
+
+  def receiveHandler: Receive = workerMsgHandler orElse
+    appMasterMsgHandler orElse
+    onMasterListChange orElse
+    clientMsgHandler orElse
+    metricsService orElse
+    jarStoreService orElse
+    terminationWatch orElse
+    disassociated orElse
+    kvServiceMsgHandler orElse
+    ActorUtil.defaultMsgHandler(self)
+
+  def workerMsgHandler: Receive = {
+    case RegisterNewWorker =>
+      val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
+      nextWorkerId += 1
+      kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
+      val workerHostname = ActorUtil.getHostname(sender())
+      LOG.info(s"Register new from $workerHostname ....")
+      self forward RegisterWorker(workerId)
+
+    case RegisterWorker(id) =>
+      context.watch(sender())
+      sender ! WorkerRegistered(id, MasterInfo(self, birth))
+      scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
+      workers += (sender() -> id)
+      val workerHostname = ActorUtil.getHostname(sender())
+      LOG.info(s"Register Worker with id $id from $workerHostname ....")
+    case resourceUpdate: ResourceUpdate =>
+      scheduler forward resourceUpdate
+  }
+
+  def jarStoreService: Receive = {
+    case GetJarStoreServer =>
+      jarStore forward GetJarStoreServer
+  }
+
+  def kvServiceMsgHandler: Receive = {
+    case PutKVSuccess =>
+    // Skip
+    case PutKVFailed(key, exception) =>
+      LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
+        ExceptionUtils.getStackTrace(exception))
+  }
+
+  def metricsService: Receive = {
+    case query: QueryHistoryMetrics =>
+      if (historyMetricsService.isEmpty) {
+        // Returns empty metrics so that we don't hang the UI
+        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+      } else {
+        historyMetricsService.get forward query
+      }
+  }
+
+  def appMasterMsgHandler: Receive = {
+    case request: RequestResource =>
+      scheduler forward request
+    case registerAppMaster: RegisterAppMaster =>
+      appManager forward registerAppMaster
+    case activateAppMaster: ActivateAppMaster =>
+      appManager forward activateAppMaster
+    case save: SaveAppData =>
+      appManager forward save
+    case get: GetAppData =>
+      appManager forward get
+    case GetAllWorkers =>
+      sender ! WorkerList(workers.values.toList)
+    case GetMasterData =>
+      val aliveFor = System.currentTimeMillis() - birth
+      val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+      val userDir = System.getProperty("user.dir")
+
+      val masterDescription =
+        MasterSummary(
+          MasterNode(hostPort.host, hostPort.port),
+          masters,
+          aliveFor,
+          logFileDir,
+          jarStoreRootPath,
+          MasterStatus.Synced,
+          userDir,
+          List.empty[MasterActivity],
+          jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+          historyMetricsConfig = getHistoryMetricsConfig
+        )
+
+      sender ! MasterData(masterDescription)
+
+    case invalidAppMaster: InvalidAppMaster =>
+      appManager forward invalidAppMaster
+  }
+
+  import scala.util.{Failure, Success}
+
+  def onMasterListChange: Receive = {
+    case MasterListUpdated(masters: List[MasterNode]) =>
+      this.masters = masters
+  }
+
+  def clientMsgHandler: Receive = {
+    case app: SubmitApplication =>
+      LOG.debug(s"Receive from client, SubmitApplication $app")
+      appManager.forward(app)
+    case app: RestartApplication =>
+      LOG.debug(s"Receive from client, RestartApplication $app")
+      appManager.forward(app)
+    case app: ShutdownApplication =>
+      LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
+      scheduler ! ApplicationFinished(app.appId)
+      appManager.forward(app)
+    case app: ResolveAppId =>
+      LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef")
+      appManager.forward(app)
+    case resolve: ResolveWorkerId =>
+      LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}")
+      val worker = workers.find(_._2 == resolve.workerId)
+      worker match {
+        case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
+        case None => sender ! ResolveWorkerIdResult(Failure(
+          new Exception(s"cannot find worker ${resolve.workerId}")))
+      }
+    case AppMastersDataRequest =>
+      LOG.debug("Master received AppMastersDataRequest")
+      appManager forward AppMastersDataRequest
+    case appMasterDataRequest: AppMasterDataRequest =>
+      LOG.debug("Master received AppMasterDataRequest")
+      appManager forward appMasterDataRequest
+    case query: QueryAppMasterConfig =>
+      LOG.debug("Master received QueryAppMasterConfig")
+      appManager forward query
+    case QueryMasterConfig =>
+      sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+  }
+
+  def disassociated: Receive = {
+    case disassociated: DisassociatedEvent =>
+      LOG.info(s" disassociated ${disassociated.remoteAddress}")
+  }
+
+  def terminationWatch: Receive = {
+    case t: Terminated =>
+      val actor = t.actor
+      LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" +
+        t.getAddressTerminated())
+
+      LOG.info("Let's filter out dead resources...")
+      // Filters out dead worker resource
+      if (workers.keySet.contains(actor)) {
+        scheduler ! WorkerTerminated(workers.get(actor).get)
+        workers -= actor
+      }
+  }
+
+  override def preStart(): Unit = {
+    val path = ActorUtil.getFullPath(context.system, self.path)
+    LOG.info(s"master path is $path")
+    val schedulerClass = Class.forName(
+      systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
+
+    appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)),
+      classOf[AppManager].getSimpleName)
+    scheduler = context.actorOf(Props(schedulerClass))
+    context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
+  }
+}
+
+object Master {
+  final val MASTER_GROUP = "master_group"
+
+  final val WORKER_ID = "next_worker_id"
+
+  case class WorkerTerminated(workerId: WorkerId)
+
+  case class MasterInfo(master: ActorRef, startTime: Long = 0L)
+
+  /** Notify the subscriber that master actor list has been updated */
+  case class MasterListUpdated(masters: List[MasterNode])
+
+  object MasterInfo {
+    def empty: MasterInfo = MasterInfo(null)
+  }
+
+  case class SlotStatus(totalSlots: Int, availableSlots: Int)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
new file mode 100644
index 0000000..623e3ff
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.ActorRef
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.scheduler.Relaxation._
+import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.mutable
+
+/** Assign resource to application based on the priority of the application */
+class PriorityScheduler extends Scheduler {
+  private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
+
+  def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] {
+    override def compare(x: PendingRequest, y: PendingRequest): Int = {
+      var res = x.request.priority.id - y.request.priority.id
+      if (res == 0) {
+        res = y.timeStamp.compareTo(x.timeStamp)
+      }
+      res
+    }
+  }
+
+  override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler
+
+  override def allocateResource(): Unit = {
+    var scheduleLater = Array.empty[PendingRequest]
+    val resourcesSnapShot = resources.clone()
+    var allocated = Resource.empty
+    val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
+
+    while (resourceRequests.nonEmpty && (allocated < totalResource)) {
+      val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue()
+      request.relaxation match {
+        case ANY =>
+          val allocations = allocateFairly(resourcesSnapShot, request)
+          val newAllocated = Resource(allocations.map(_.resource.slots).sum)
+          if (allocations.nonEmpty) {
+            appMaster ! ResourceAllocated(allocations.toArray)
+          }
+          if (newAllocated < request.resource) {
+            val remainingRequest = request.resource - newAllocated
+            val remainingExecutors = request.executorNum - allocations.length
+            val newResourceRequest = request.copy(resource = remainingRequest,
+              executorNum = remainingExecutors)
+            scheduleLater = scheduleLater :+
+              PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
+          }
+          allocated = allocated + newAllocated
+        case ONEWORKER =>
+          val availableResource = resourcesSnapShot.find { params =>
+            val (_, (_, resource)) = params
+            resource > request.resource
+          }
+          if (availableResource.nonEmpty) {
+            val (workerId, (worker, resource)) = availableResource.get
+            allocated = allocated + request.resource
+            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+              workerId)))
+            resourcesSnapShot.update(workerId, (worker, resource - request.resource))
+          } else {
+            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
+          }
+        case SPECIFICWORKER =>
+          val workerAndResource = resourcesSnapShot.get(request.workerId)
+          if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
+            val (worker, availableResource) = workerAndResource.get
+            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+              request.workerId)))
+            allocated = allocated + request.resource
+            resourcesSnapShot.update(request.workerId, (worker,
+              availableResource - request.resource))
+          } else {
+            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
+          }
+      }
+    }
+    for (request <- scheduleLater)
+      resourceRequests.enqueue(request)
+  }
+
+  def resourceRequestHandler: Receive = {
+    case RequestResource(appId, request) =>
+      LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " +
+        s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}")
+      val appMaster = sender()
+      resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
+        System.currentTimeMillis()))
+      allocateResource()
+  }
+
+  override def doneApplication(appId: Int): Unit = {
+    resourceRequests = resourceRequests.filter(_.appId != appId)
+  }
+
+  private def allocateFairly(
+      resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest)
+    : List[ResourceAllocation] = {
+    val workerNum = resources.size
+    var allocations = List.empty[ResourceAllocation]
+    var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
+    var remainingRequest = request.resource
+    var remainingExecutors = Math.min(request.executorNum, request.resource.slots)
+
+    while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
+      val exeutorNum = Math.min(workerNum, remainingExecutors)
+      val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
+
+      val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
+      val pickedResources = sortedResources.take(exeutorNum)
+
+      val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex =>
+        val ((workerId, (worker, resource)), index) = workerWithIndex
+        0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
+      }.sortBy(_._2).map(_._1)
+
+      if (flattenResource.length < toRequest.slots) {
+        // Can not safisfy the user's requirements
+        totalAvailable = Resource.empty
+      } else {
+        flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).
+          toArray.foreach { params =>
+          val ((workerId, worker), slots) = params
+          resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
+          allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
+        }
+        totalAvailable -= toRequest
+        remainingRequest -= toRequest
+        remainingExecutors -= exeutorNum
+      }
+    }
+    allocations
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
new file mode 100644
index 0000000..ec9f1ba
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.{Actor, ActorRef}
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.WorkerTerminated
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+import scala.collection.mutable
+
+/**
+ * Scheduler schedule resource for different applications.
+ */
+abstract class Scheduler extends Actor {
+  val LOG: Logger = LogUtil.getLogger(getClass)
+  protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
+
+  def handleScheduleMessage: Receive = {
+    case WorkerRegistered(id, _) =>
+      if (!resources.contains(id)) {
+        LOG.info(s"Worker $id added to the scheduler")
+        resources.put(id, (sender, Resource.empty))
+      }
+    case update@ResourceUpdate(worker, workerId, resource) =>
+      LOG.info(s"$update...")
+      if (resources.contains(workerId)) {
+        val resourceReturned = resource > resources.get(workerId).get._2
+        resources.update(workerId, (worker, resource))
+        if (resourceReturned) {
+          allocateResource()
+        }
+        sender ! UpdateResourceSucceed
+      }
+      else {
+        sender ! UpdateResourceFailed(
+          s"ResourceUpdate failed! The worker $workerId has not been registered into master")
+      }
+    case WorkerTerminated(workerId) =>
+      if (resources.contains(workerId)) {
+        resources -= workerId
+      }
+    case ApplicationFinished(appId) =>
+      doneApplication(appId)
+  }
+
+  def allocateResource(): Unit
+
+  def doneApplication(appId: Int): Unit
+}
+
+object Scheduler {
+  case class PendingRequest(
+      appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
+
+  case class ApplicationFinished(appId: Int)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
new file mode 100644
index 0000000..3d5b0af
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.{LogUtil, RichProcess, Util}
+import org.slf4j.Logger
+
+/** Launcher to start an executor process */
+class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override def createProcess(
+      appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
+      classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
+
+    LOG.info(s"Launch executor $executorId, classpath: ${classPath.mkString(File.pathSeparator)}")
+    Util.startProcess(options, classPath, mainClass, arguments)
+  }
+
+  override def cleanProcess(appId: Int, executorId: Int): Unit = {}
+}