You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:50 UTC

[40/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
new file mode 100644
index 0000000..a41856d
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor._
+import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.TestProbeUtil._
+import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
+import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeEnvironmentSpec.TestAppMasterEnv
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.StartExecutorSystems
+import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
+
+class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  implicit var system: ActorSystem = null
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  "MasterWithExecutorSystemProvider" should
+    "forward request StartExecutorSystem to ExecutorSystemProvider" in {
+
+    val client = TestProbe()
+    val master = TestProbe()
+    val provider = TestProbe()
+    val providerProps: Props = provider
+    val masterEnhanced = system.actorOf(Props(
+      new MasterWithExecutorSystemProvider(master.ref, providerProps)))
+
+    val start = StartExecutorSystems(null, null)
+    client.send(masterEnhanced, start)
+    provider.expectMsg(start)
+
+    val anyOtherMessage = "any other message"
+    client.send(masterEnhanced, anyOtherMessage)
+    master.expectMsg(anyOtherMessage)
+    system.stop(masterEnhanced)
+  }
+
+  "LazyStartAppMaster" should "forward command to appmaster when app master started" in {
+
+    val appMaster = TestProbe()
+    val appMasterProps: Props = appMaster
+    val lazyAppMaster = system.actorOf(Props(new LazyStartAppMaster(appId = 0, appMasterProps)))
+    val msg = "Some"
+    lazyAppMaster ! msg
+    lazyAppMaster ! StartAppMaster
+    appMaster.expectMsg(msg)
+
+    system.stop(appMaster.ref)
+    val client = TestProbe()
+    client.watch(lazyAppMaster)
+    client.expectTerminated(lazyAppMaster)
+  }
+
+  "AppMasterRuntimeEnvironment" should "start appMaster when master is connected" in {
+    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
+      setupAppMasterRuntimeEnv()
+
+    masterConnectionKeeper.send(runtimeEnv, MasterConnected)
+    appMaster.expectMsg(StartAppMaster)
+  }
+
+  "AppMasterRuntimeEnvironment" should "shutdown itself when master is stopped" in {
+
+    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
+      setupAppMasterRuntimeEnv()
+
+    masterConnectionKeeper.send(runtimeEnv, MasterStopped)
+    val client = TestProbe()
+    client.watch(runtimeEnv)
+    client.expectTerminated(runtimeEnv)
+  }
+
+  "AppMasterRuntimeEnvironment" should "shutdown itself when appMaster is stopped" in {
+
+    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
+      setupAppMasterRuntimeEnv()
+
+    val client = TestProbe()
+    client.watch(runtimeEnv)
+    system.stop(appMaster.ref)
+    client.expectTerminated(runtimeEnv)
+  }
+
+  private def setupAppMasterRuntimeEnv(): TestAppMasterEnv = {
+    val appContext = AppMasterContext(0, null, null, null, null, null, null)
+    val app = AppDescription("app", "AppMasterClass", null, null)
+    val master = TestProbe()
+    val masterFactory = (_: AppId, _: MasterActorRef) => toProps(master)
+    val appMaster = TestProbe()
+    val appMasterFactory = (_: AppMasterContext, _: AppDescription) => toProps(appMaster)
+    val masterConnectionKeeper = TestProbe()
+    val masterConnectionKeeperFactory =
+      (_: MasterActorRef, _: RegisterAppMaster, _: ListenerActorRef) =>
+        toProps(masterConnectionKeeper)
+
+    val runtimeEnv = system.actorOf(
+      Props(new AppMasterRuntimeEnvironment(
+        appContext, app, List(master.ref.path), masterFactory,
+        appMasterFactory, masterConnectionKeeperFactory)))
+
+    TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv)
+  }
+}
+
+object AppMasterRuntimeEnvironmentSpec {
+
+  case class TestAppMasterEnv(
+      master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe,
+      appMasterRuntimeEnv: ActorRef)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
new file mode 100644
index 0000000..d40e775
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigValueFactory
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.Session
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
+import org.apache.gearpump.util.Constants
+
+class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  implicit var system: ActorSystem = null
+  val workerId: WorkerId = WorkerId(0, 0L)
+  val appId = 0
+  val executorId = 0
+  val url = "akka.tcp://worker@127.0.0.1:3000"
+  val session = Session(null, null)
+  val launchExecutorSystemTimeout = 3000
+  val activeConfig = TestUtil.DEFAULT_CONFIG.
+    withValue(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS,
+      ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout))
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", activeConfig)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "report success when worker launch the system successfully" in {
+    val worker = TestProbe()
+    val client = TestProbe()
+
+    val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
+    client.watch(launcher)
+    client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1)))
+
+    worker.expectMsgType[LaunchExecutor]
+    worker.reply(RegisterActorSystem(url))
+
+    worker.expectMsgType[ActorSystemRegistered]
+
+    client.expectMsgType[LaunchExecutorSystemSuccess]
+    client.expectTerminated(launcher)
+  }
+
+  it should "report failure when worker refuse to launch the system explicitly" in {
+    val worker = TestProbe()
+    val client = TestProbe()
+
+    val resource = Resource(4)
+
+    val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
+    client.watch(launcher)
+    client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, resource))
+
+    worker.expectMsgType[LaunchExecutor]
+    worker.reply(ExecutorLaunchRejected())
+
+    client.expectMsg(LaunchExecutorSystemRejected(resource, null, session))
+    client.expectTerminated(launcher)
+  }
+
+  it should "report timeout when trying to start a executor system on worker, " +
+    "and worker doesn't response" in {
+    val client = TestProbe()
+    val worker = TestProbe()
+    val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
+    client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1)))
+    client.watch(launcher)
+    val waitFor = launchExecutorSystemTimeout + 10000
+    client.expectMsgType[LaunchExecutorSystemTimeout](waitFor.milliseconds)
+    client.expectTerminated(launcher, waitFor.milliseconds)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
new file mode 100644
index 0000000..6987af4
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemSchedulerSpec.{ExecutorSystemLauncherStarted, MockExecutorSystemLauncher}
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.cluster.{AppJar, TestUtil}
+import org.apache.gearpump.jarstore.FilePath
+
+class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+  val appId = 0
+  val workerId = WorkerId(0, 0L)
+  val resource = Resource(1)
+  val resourceRequest = ResourceRequest(resource, WorkerId.unspecified)
+  val mockJar = AppJar("for_test", FilePath("PATH"))
+  val emptyJvmConfig = ExecutorSystemJvmConfig(Array.empty, Array.empty, Some(mockJar), "")
+  val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig)
+
+  implicit var system: ActorSystem = null
+  var worker: TestProbe = null
+  var workerInfo: WorkerInfo = null
+  var masterProxy: TestProbe = null
+  var launcher: TestProbe = null
+  var client: TestProbe = null
+
+  override def beforeEach(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+    worker = TestProbe()
+    workerInfo = WorkerInfo(workerId, worker.ref)
+    masterProxy = TestProbe()
+    launcher = TestProbe()
+    client = TestProbe()
+
+    val scheduler = system.actorOf(
+      Props(new ExecutorSystemScheduler(appId, masterProxy.ref, (appId: Int, session: Session) => {
+        Props(new MockExecutorSystemLauncher(launcher, session))
+      })))
+
+    client.send(scheduler, start)
+    masterProxy.expectMsg(RequestResource(appId, resourceRequest))
+  }
+
+  override def afterEach(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  private def launcherStarted(launcher: TestProbe): Option[ExecutorSystemLauncherStarted] = {
+    val launcherStarted = launcher.receiveOne(15.seconds)
+
+    launcherStarted match {
+      case start: ExecutorSystemLauncherStarted => Some(start)
+      case x =>
+        assert(false, "ExecutorSystemLauncherStarted == false")
+        None
+    }
+  }
+
+  it should "schedule and launch an executor system on target worker successfully" in {
+
+    masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
+
+    val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
+
+    var systemId = 0
+    launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
+
+    val executorSystemProbe = TestProbe()
+    val executorSystem =
+      ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo)
+    launcher.reply(LaunchExecutorSystemSuccess(executorSystem, session))
+    client.expectMsg(ExecutorSystemStarted(executorSystem, Some(mockJar)))
+  }
+
+  it should "report failure when resource cannot be allocated" in {
+    client.expectMsg(30.seconds, StartExecutorSystemTimeout)
+  }
+
+  it should "schedule new resouce on new worker " +
+    "when target worker reject creating executor system" in {
+    masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
+    val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
+
+    var systemId = 0
+    launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
+    launcher.reply(LaunchExecutorSystemRejected(resource, "", session))
+    masterProxy.expectMsg(RequestResource(appId, resourceRequest))
+  }
+
+  it should "report failure when resource is allocated, but timeout " +
+    "when starting the executor system" in {
+    masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
+    val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
+
+    var systemId = 0
+    launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
+    launcher.reply(LaunchExecutorSystemTimeout(session))
+    client.expectMsg(StartExecutorSystemTimeout)
+  }
+}
+
+object ExecutorSystemSchedulerSpec {
+  class MockExecutorSystemLauncher(forwardTo: TestProbe, session: Session) extends Actor {
+    forwardTo.ref ! ExecutorSystemLauncherStarted(session)
+
+    def receive: Receive = {
+      case msg => forwardTo.ref forward msg
+    }
+  }
+
+  case class ExecutorSystemLauncherStarted(session: Session)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
new file mode 100644
index 0000000..163da0a
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, _}
+import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeperSpec.ConnectionKeeperTestEnv
+import org.apache.gearpump.cluster.master.MasterProxy.WatchMaster
+
+class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+  implicit var system: ActorSystem = null
+  val register = RegisterAppMaster(null, null)
+  val appId = 0
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  private def startMasterConnectionKeeper: ConnectionKeeperTestEnv = {
+    val statusChangeSubscriber = TestProbe()
+    val master = TestProbe()
+
+    val keeper = system.actorOf(Props(
+      new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref)))
+    statusChangeSubscriber.watch(keeper)
+
+    master.expectMsgType[WatchMaster]
+
+    // Master is alive, response to RegisterAppMaster
+    master.expectMsgType[RegisterAppMaster]
+    master.reply(AppMasterRegistered(appId))
+
+    // Notify listener that master is alive
+    statusChangeSubscriber.expectMsg(MasterConnected)
+    ConnectionKeeperTestEnv(master, keeper, statusChangeSubscriber)
+  }
+
+  it should "start correctly and notify listener that master is alive" in {
+    startMasterConnectionKeeper
+  }
+
+  it should "re-register the appmaster when master is restarted" in {
+    import org.apache.gearpump.cluster.master.MasterProxy.MasterRestarted
+    val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper
+
+    // Master is restarted
+    master.send(keeper, MasterRestarted)
+    master.expectMsgType[RegisterAppMaster]
+    master.reply(AppMasterRegistered(appId))
+    masterChangeListener.expectMsg(MasterConnected)
+
+    // Recovery from Master restart is transparent to listener
+    masterChangeListener.expectNoMsg()
+  }
+
+  it should "notify listener and then shutdown itself when master is dead" in {
+    val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper
+
+    // Master is dead
+    master.send(keeper, MasterStopped)
+
+    // Keeper should tell the listener that master is stopped before shutting down itself
+    masterChangeListener.expectMsg(MasterStopped)
+    masterChangeListener.expectTerminated(keeper)
+  }
+
+  it should "mark the master as dead when timeout" in {
+    val statusChangeSubscriber = TestProbe()
+    val master = TestProbe()
+
+    // MasterConnectionKeeper register itself to master by sending RegisterAppMaster
+    val keeper = system.actorOf(Props(new MasterConnectionKeeper(register,
+      master.ref, statusChangeSubscriber.ref)))
+
+    // Master doesn't reply to keeper,
+    statusChangeSubscriber.watch(keeper)
+
+    // Timeout, keeper notify listener, and then make suicide
+    statusChangeSubscriber.expectMsg(60.seconds, MasterStopped)
+    statusChangeSubscriber.expectTerminated(keeper, 60.seconds)
+  }
+}
+
+object MasterConnectionKeeperSpec {
+  case class ConnectionKeeperTestEnv(
+      master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/main/ArgumentParserSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/ArgumentParserSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/ArgumentParserSpec.scala
new file mode 100644
index 0000000..0c7c0c3
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/ArgumentParserSpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.scalatest.{FlatSpec, Matchers}
+
+class ArgumentParserSpec extends FlatSpec with Matchers {
+  it should "parse arguments correctly" in {
+
+    val parser = new ArgumentsParser {
+      override val options = Array(
+        "flag" -> CLIOption[Any]("", required = true),
+        "opt1" -> CLIOption[Any]("", required = true),
+        "opt2" -> CLIOption[Any]("", required = true))
+    }
+
+    val result = parser.parse(Array("-flag", "-opt1", "1", "-opt2", "2", "arg1", "arg2"))
+    assert(result.getBoolean("flag"))
+    assert(result.getInt("opt1") == 1)
+    assert(result.getString("opt1") == "1")
+    assert(result.getInt("opt2") == 2)
+
+    assert(result.remainArgs(0) == "arg1")
+    assert(result.remainArgs(1) == "arg2")
+  }
+
+  it should "handle interleaved options and remain args" in {
+
+    val parser = new ArgumentsParser {
+      override val options = Array(
+        "opt1" -> CLIOption[Any]("", required = true))
+    }
+
+    val result = parser.parse(Array("-opt1", "1", "xx.MainClass", "-opt2", "2"))
+    assert(result.getInt("opt1") == 1)
+
+    assert(result.remainArgs.length == 3)
+
+    intercept[Exception] {
+      parser.parse(Array("-opt2"))
+    }
+
+    intercept[Exception] {
+      parser.parse(Array("-opt2", "2", "-opt1"))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/master/AppMasterLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/AppMasterLauncherSpec.scala
new file mode 100644
index 0000000..ac08276
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/master/AppMasterLauncherSpec.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.util.Success
+
+import akka.actor._
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.util.ActorSystemBooter._
+
+class AppMasterLauncherSpec extends FlatSpec with Matchers
+  with BeforeAndAfterEach with MasterHarness {
+
+  override def config: Config = TestUtil.DEFAULT_CONFIG
+
+  val appId = 1
+  val executorId = 2
+  var master: TestProbe = null
+  var client: TestProbe = null
+  var worker: TestProbe = null
+  var watcher: TestProbe = null
+  var appMasterLauncher: ActorRef = null
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+    master = createMockMaster()
+    client = TestProbe()(getActorSystem)
+    worker = TestProbe()(getActorSystem)
+    watcher = TestProbe()(getActorSystem)
+    appMasterLauncher = getActorSystem.actorOf(AppMasterLauncher.props(appId, executorId,
+      TestUtil.dummyApp, None, "username", master.ref, Some(client.ref)))
+    watcher watch appMasterLauncher
+    master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
+    val resource = ResourceAllocated(
+      Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
+    master.reply(resource)
+    worker.expectMsgType[LaunchExecutor]
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+
+  "AppMasterLauncher" should "launch appmaster correctly" in {
+    worker.reply(RegisterActorSystem("systempath"))
+    worker.expectMsgType[ActorSystemRegistered]
+
+    worker.expectMsgType[CreateActor]
+    worker.reply(ActorCreated(master.ref, "appmaster"))
+
+    client.expectMsg(SubmitApplicationResult(Success(appId)))
+    watcher.expectTerminated(appMasterLauncher)
+  }
+
+  "AppMasterLauncher" should "reallocate resource if executor launch rejected" in {
+    worker.reply(ExecutorLaunchRejected(""))
+    master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
+
+    val resource = ResourceAllocated(
+      Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
+    master.reply(resource)
+    worker.expectMsgType[LaunchExecutor]
+
+    worker.reply(RegisterActorSystem("systempath"))
+    worker.expectMsgType[ActorSystemRegistered]
+
+    worker.expectMsgType[CreateActor]
+    worker.reply(CreateActorFailed("", new Exception))
+    worker.expectMsgType[ShutdownExecutor]
+    assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
+    watcher.expectTerminated(appMasterLauncher)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
new file mode 100644
index 0000000..a8adaf0
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.appmaster.ApplicationState
+
+class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
+
+  "ApplicationState" should "check equal with respect to only appId and attemptId" in {
+    val stateA = ApplicationState(0, "application0", 0, null, null, null, "A")
+    val stateB = ApplicationState(0, "application0", 0, null, null, null, "B")
+    val stateC = ApplicationState(0, "application1", 1, null, null, null, "A")
+
+    assert(stateA == stateB)
+    assert(stateA.hashCode == stateB.hashCode)
+    assert(stateA != stateC)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/master/MasterProxySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/MasterProxySpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/MasterProxySpec.scala
new file mode 100644
index 0000000..ff1e7b1
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/master/MasterProxySpec.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+class MasterProxySpec {
+
+  // Master proxy retries multiple times to find the master
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/master/MasterSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/MasterSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/MasterSpec.scala
new file mode 100644
index 0000000..4070d8c
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/master/MasterSpec.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+class MasterSpec {
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
new file mode 100644
index 0000000..aa27d8f
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/metrics/MetricsSpec.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
+import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
+
+class MetricsSpec extends FlatSpec with Matchers with MockitoSugar {
+
+  "Counter" should "handle sampleRate == 1" in {
+
+    val mockBaseCounter = mock[CodaHaleCounter]
+
+    val counter = new Counter("c", mockBaseCounter)
+
+    counter.inc()
+    counter.inc()
+
+    verify(mockBaseCounter, times(2)).inc(1)
+  }
+
+  "Counter" should "handle sampleRate == 3" in {
+
+    val mockBaseCounter = mock[CodaHaleCounter]
+
+    val counter = new Counter("c", mockBaseCounter, 3)
+
+    counter.inc(1)
+    counter.inc(1)
+    counter.inc(1)
+    counter.inc(1)
+    counter.inc(1)
+    counter.inc(1)
+
+    verify(mockBaseCounter, times(2)).inc(3)
+  }
+
+  "Histogram" should "handle sampleRate == 1" in {
+
+    val mockBaseHistogram = mock[CodaHaleHistogram]
+
+    val histogram = new Histogram("h", mockBaseHistogram)
+
+    histogram.update(3)
+    histogram.update(7)
+    histogram.update(5)
+    histogram.update(9)
+
+    verify(mockBaseHistogram, times(4)).update(anyLong())
+  }
+
+  "Histogram" should "handle sampleRate > 1" in {
+
+    val mockBaseHistogram = mock[CodaHaleHistogram]
+
+    val histogram = new Histogram("h", mockBaseHistogram, 2)
+
+    histogram.update(3)
+    histogram.update(4)
+    histogram.update(5)
+    histogram.update(6)
+
+    verify(mockBaseHistogram, times(1)).update(4L)
+    verify(mockBaseHistogram, times(1)).update(6L)
+  }
+
+  "Meter" should "handle sampleRate == 1" in {
+
+    val mockBaseMeter = mock[CodaHaleMeter]
+
+    val meter = new Meter("m", mockBaseMeter)
+
+    meter.mark()
+    meter.mark(3)
+
+    verify(mockBaseMeter, times(1)).mark(1L)
+    verify(mockBaseMeter, times(1)).mark(3L)
+  }
+
+  "Meter" should "handle sampleRate > 1" in {
+
+    val mockBaseMeter = mock[CodaHaleMeter]
+
+    val meter = new Meter("m", mockBaseMeter, 2)
+
+    meter.mark(1)
+    meter.mark(3)
+
+    meter.mark(5)
+    meter.mark(7)
+
+    verify(mockBaseMeter, times(1)).mark(4L)
+    verify(mockBaseMeter, times(1)).mark(12L)
+  }
+
+  "Metrics" should "have a name" in {
+    val metrics = new Metrics(3)
+    assert(metrics.counter("counter").name == "counter")
+    assert(metrics.histogram("histogram").name == "histogram")
+    assert(metrics.meter("meter").name == "meter")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
new file mode 100644
index 0000000..14be887
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/partitioner/PartitionerSpec.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.partitioner
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.Message
+
+class PartitionerSpec extends FlatSpec with Matchers {
+  val NUM = 10
+
+  "HashPartitioner" should "hash same key to same slots" in {
+    val partitioner = new HashPartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+
+    val partition = partitioner.getPartition(msg, NUM)
+    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
+
+    assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" +
+      "consistent result")
+  }
+
+  "ShufflePartitioner" should "hash same key randomly" in {
+    val partitioner = new ShufflePartitioner
+
+    val data = new Array[Byte](1000)
+    (new java.util.Random()).nextBytes(data)
+    val msg = Message(data)
+
+    val partition = partitioner.getPartition(msg, NUM)
+    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
+
+    assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" +
+      "consistent result")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala b/core/src/test/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
new file mode 100644
index 0000000..ffe47be
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.security
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+
+class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers {
+  it should "authenticate correctly" in {
+    val config = TestUtil.UI_CONFIG
+    implicit val system = ActorSystem("ConfigFileBasedAuthenticatorSpec", config)
+    implicit val ec = system.dispatcher
+    val timeout = 30.seconds
+
+    val authenticator = new ConfigFileBasedAuthenticator(config)
+    val guest = Await.result(authenticator.authenticate("guest", "guest", ec), timeout)
+    val admin = Await.result(authenticator.authenticate("admin", "admin", ec), timeout)
+
+    val nonexist = Await.result(authenticator.authenticate("nonexist", "nonexist", ec), timeout)
+
+    val failedGuest = Await.result(authenticator.authenticate("guest", "wrong", ec), timeout)
+    val failedAdmin = Await.result(authenticator.authenticate("admin", "wrong", ec), timeout)
+
+    assert(guest == Authenticator.Guest)
+    assert(admin == Authenticator.Admin)
+    assert(nonexist == Authenticator.UnAuthenticated)
+    assert(failedGuest == Authenticator.UnAuthenticated)
+    assert(failedAdmin == Authenticator.UnAuthenticated)
+
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/security/PasswordUtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/security/PasswordUtilSpec.scala b/core/src/test/scala/org/apache/gearpump/security/PasswordUtilSpec.scala
new file mode 100644
index 0000000..1cec360
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/security/PasswordUtilSpec.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.security
+
+import org.scalatest.{FlatSpec, Matchers}
+
+class PasswordUtilSpec extends FlatSpec with Matchers {
+
+  it should "verify the credential correctly" in {
+    val password = "password"
+
+    val digest1 = PasswordUtil.hash(password)
+    val digest2 = PasswordUtil.hash(password)
+
+    // Uses different salt each time, thus creating different hash.
+    assert(digest1 != digest2)
+
+    // Both are valid hash.
+    assert(PasswordUtil.verify(password, digest1))
+    assert(PasswordUtil.verify(password, digest2))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
new file mode 100644
index 0000000..d4f4167
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/serializer/SerializerSpec.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.serializer
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorSystem, ExtendedActorSystem}
+import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+import io.gearpump.esotericsoftware.kryo.io.{Input, Output}
+import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
+import org.apache.gearpump.serializer.SerializerSpec._
+
+class SerializerSpec extends FlatSpec with Matchers with MockitoSugar {
+  val config = ConfigFactory.empty.withValue("gearpump.serializers",
+    ConfigValueFactory.fromAnyRef(Map(classOf[ClassA].getName -> classOf[ClassASerializer].getName,
+      classOf[ClassB].getName -> classOf[ClassBSerializer].getName).asJava))
+
+  "GearpumpSerialization" should "register custom serializers" in {
+    val serialization = new GearpumpSerialization(config)
+    val kryo = new Kryo
+    serialization.customize(kryo)
+
+    val forB = kryo.getRegistration(classOf[ClassB])
+    assert(forB.getSerializer.isInstanceOf[ClassBSerializer])
+
+    val forA = kryo.getRegistration(classOf[ClassA])
+    assert(forA.getSerializer.isInstanceOf[ClassASerializer])
+  }
+
+  "FastKryoSerializer" should "serialize correctly" in {
+    val myConfig = config.withFallback(TestUtil.DEFAULT_CONFIG.withoutPath("gearpump.serializers"))
+    val system = ActorSystem("my", myConfig)
+
+    val serializer = new FastKryoSerializer(system.asInstanceOf[ExtendedActorSystem])
+
+    val bytes = serializer.serialize(new ClassA)
+    val anotherA = serializer.deserialize(bytes)
+
+    assert(anotherA.isInstanceOf[ClassA])
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}
+
+object SerializerSpec {
+
+  class ClassA {}
+
+  class ClassASerializer extends KryoSerializer[ClassA] {
+    override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = {
+      output.writeString(classOf[ClassA].getName.toString)
+    }
+
+    override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = {
+      val className = input.readString()
+      Class.forName(className).newInstance().asInstanceOf[ClassA]
+    }
+  }
+
+  class ClassB {}
+
+  class ClassBSerializer extends KryoSerializer[ClassA] {
+    override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = {}
+
+    override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = {
+      null
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/transport/MockTransportSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/transport/MockTransportSerializer.scala b/core/src/test/scala/org/apache/gearpump/transport/MockTransportSerializer.scala
new file mode 100644
index 0000000..ddeb72d
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/transport/MockTransportSerializer.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport
+
+import java.io.{DataInput, DataOutput}
+import org.apache.gearpump.transport.MockTransportSerializer.NettyMessage
+import org.apache.gearpump.transport.netty.ITransportMessageSerializer
+
+class MockTransportSerializer extends ITransportMessageSerializer {
+  override def getLength(obj: scala.Any): Int = 4
+
+  override def serialize(dataOutput: DataOutput, transportMessage: scala.Any): Unit = {
+    transportMessage match {
+      case msg: NettyMessage =>
+        dataOutput.writeInt(msg.num)
+    }
+  }
+
+  override def deserialize(dataInput: DataInput, length: Int): AnyRef = {
+    NettyMessage(dataInput.readInt())
+  }
+}
+
+object MockTransportSerializer {
+  case class NettyMessage(num: Int)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/transport/NettySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/transport/NettySpec.scala b/core/src/test/scala/org/apache/gearpump/transport/NettySpec.scala
new file mode 100644
index 0000000..b1a429c
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/transport/NettySpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.transport
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.testkit.TestProbe
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.transport.MockTransportSerializer.NettyMessage
+import org.apache.gearpump.transport.netty.{TaskMessage, Context}
+import org.apache.gearpump.util.Util
+
+class NettySpec extends FlatSpec with Matchers with MockitoSugar {
+
+  "Netty Transport" should "send and receive message correctly " in {
+    val conf = TestUtil.DEFAULT_CONFIG
+    val system = ActorSystem("transport", conf)
+    val context = new Context(system, conf)
+    val serverActor = TestProbe()(system)
+
+    val port = Util.findFreePort()
+
+    import system.dispatcher
+    system.scheduler.scheduleOnce(Duration(1, TimeUnit.SECONDS)) {
+      context.bind("server", new ActorLookupById {
+        override def lookupLocalActor(id: Long): Option[ActorRef] = Some(serverActor.ref)
+      }, false, port.get)
+    }
+    val client = context.connect(HostPort("127.0.0.1", port.get))
+
+    val data = NettyMessage(0)
+    val msg = new TaskMessage(0, 1, 2, data)
+    client ! msg
+    serverActor.expectMsg(15.seconds, data)
+
+    context.close()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/ActorSystemBooterSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/ActorSystemBooterSpec.scala b/core/src/test/scala/org/apache/gearpump/util/ActorSystemBooterSpec.scala
new file mode 100644
index 0000000..0d33d44
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/util/ActorSystemBooterSpec.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.testkit.TestProbe
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem, _}
+import org.apache.gearpump.util.ActorSystemBooterSpec._
+
+class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
+
+  "ActorSystemBooter" should "report its address back" in {
+    val boot = bootSystem()
+    boot.prob.expectMsgType[RegisterActorSystem]
+    boot.shutdown()
+  }
+
+  "ActorSystemBooter" should "terminate itself when parent actor dies" in {
+    val boot = bootSystem()
+    boot.prob.expectMsgType[RegisterActorSystem]
+
+    val dummy = boot.host.actorOf(Props(classOf[Dummy]), "dummy")
+    boot.prob.reply(ActorSystemRegistered(boot.prob.ref))
+    boot.prob.reply(BindLifeCycle(dummy))
+    boot.host.stop(dummy)
+    val terminated = retry(5)(boot.bootedSystem.whenTerminated.isCompleted)
+    assert(terminated)
+    boot.shutdown()
+  }
+
+  "ActorSystemBooter" should "create new actor" in {
+    val boot = bootSystem()
+    boot.prob.expectMsgType[RegisterActorSystem]
+    boot.prob.reply(ActorSystemRegistered(boot.prob.ref))
+    boot.prob.reply(CreateActor(Props(classOf[AcceptThreeArguments], 1, 2, 3), "three"))
+    boot.prob.expectMsgType[ActorCreated]
+
+    boot.prob.reply(CreateActor(Props(classOf[AcceptZeroArguments]), "zero"))
+    boot.prob.expectMsgType[ActorCreated]
+
+    boot.shutdown()
+  }
+
+  private def bootSystem(): Boot = {
+    val booter = ActorSystemBooter(TestUtil.DEFAULT_CONFIG)
+
+    val system = ActorSystem("reportback", TestUtil.DEFAULT_CONFIG)
+
+    val receiver = TestProbe()(system)
+    val address = ActorUtil.getFullPath(system, receiver.ref.path)
+
+    val bootSystem = booter.boot("booter", address)
+
+    Boot(system, receiver, bootSystem)
+  }
+
+  case class Boot(host: ActorSystem, prob: TestProbe, bootedSystem: ActorSystem) {
+    def shutdown(): Unit = {
+      host.terminate()
+      bootedSystem.terminate()
+      Await.result(host.whenTerminated, Duration.Inf)
+      Await.result(bootedSystem.whenTerminated, Duration.Inf)
+    }
+  }
+
+  def retry(seconds: Int)(fn: => Boolean): Boolean = {
+    val result = fn
+    if (result) {
+      result
+    } else {
+      Thread.sleep(1000)
+      retry(seconds - 1)(fn)
+    }
+  }
+}
+
+object ActorSystemBooterSpec {
+  class Dummy extends Actor {
+    def receive: Receive = {
+      case _ =>
+    }
+  }
+
+  class AcceptZeroArguments extends Actor {
+    def receive: Receive = {
+      case _ =>
+    }
+  }
+
+  class AcceptThreeArguments(a: Int, b: Int, c: Int) extends Actor {
+    def receive: Receive = {
+      case _ =>
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/ActorUtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/ActorUtilSpec.scala b/core/src/test/scala/org/apache/gearpump/util/ActorUtilSpec.scala
new file mode 100644
index 0000000..534bf00
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/util/ActorUtilSpec.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import org.scalatest.FlatSpec
+
+import org.apache.gearpump.transport.HostPort
+
+class ActorUtilSpec extends FlatSpec {
+  "masterActorPath" should "construct the ActorPath from HostPort" in {
+    import org.apache.gearpump.util.Constants.MASTER
+
+    val host = "127.0.0.1"
+    val port = 3000
+    val master = HostPort("127.0.0.1", 3000)
+    val masterPath = ActorUtil.getMasterActorPath(master)
+    assert(masterPath.address.port == Some(port))
+    assert(masterPath.address.system == MASTER)
+    assert(masterPath.address.host == Some(host))
+    assert(masterPath.address.protocol == "akka.tcp")
+    assert(masterPath.toStringWithoutAddress == s"/user/$MASTER")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/ConfigsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/ConfigsSpec.scala b/core/src/test/scala/org/apache/gearpump/util/ConfigsSpec.scala
new file mode 100644
index 0000000..e7b9d18
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/util/ConfigsSpec.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.io.File
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.ActorSystem
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.{ClusterConfig, ClusterConfigSource, UserConfig}
+
+class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar {
+  "Typesafe Cluster Configs" should "follow the override rules" in {
+
+    val conf =
+      """
+      gearpump {
+        gear = "gearpump"
+      }
+
+      gearpump-master {
+        conf = "master"
+      }
+      gearpump-worker {
+        conf = "worker"
+      }
+      conf = "base"
+      """
+
+    val file = File.createTempFile("test", ".conf")
+    FileUtils.write(file, conf)
+
+    val raw = ClusterConfig.load(ClusterConfigSource(file.toString))
+
+    assert(raw.master.getString("conf") == "master", "master > base")
+    assert(raw.worker.getString("conf") == "worker", "worker > base")
+    assert(raw.default.getString("conf") == "base", "application > base")
+
+    file.delete()
+  }
+
+  "ClusterConfigSource" should "return empty for non-exist files" in {
+    val source = ClusterConfigSource("non-exist")
+    var config = source.getConfig
+    assert(config.isEmpty)
+
+    val nullCheck = ClusterConfigSource(null)
+    config = nullCheck.getConfig
+    assert(config.isEmpty)
+  }
+
+  "User Config" should "work" in {
+
+    implicit val system = ActorSystem("forSerialization")
+
+    val map = Map[String, String]("key1" -> "1", "key2" -> "value2")
+
+    val user = new UserConfig(map)
+      .withLong("key3", 2L)
+      .withBoolean("key4", value = true)
+      .withFloat("key5", 3.14F)
+      .withDouble("key6", 2.718)
+
+    assert(user.getInt("key1").get == 1)
+    assert(user.getString("key1").get == "1")
+    assert(user.getLong("key3").get == 2L)
+    assert(user.getBoolean("key4").get == true)
+    assert(user.getFloat("key5").get == 3.14F)
+    assert(user.getDouble("key6").get == 2.718)
+
+    val data = new ConfigsSpec.Data(3)
+    assert(data == user.withValue("data", data).getValue[ConfigsSpec.Data]("data").get)
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+}
+
+object ConfigsSpec {
+  case class Data(value: Int)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
new file mode 100644
index 0000000..7ad83ce
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/util/FileUtilsSpec.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.io.File
+import java.util
+
+import org.scalatest.FlatSpec
+
+import io.gearpump.google.common.io.Files
+
+class FileUtilsSpec extends FlatSpec {
+  val TXT =
+    """
+      |This is a multiple line
+      |text
+      |
+    """.stripMargin
+
+  it should "read/write string correctly" in {
+    val file = File.createTempFile("fileutilspec", ".test")
+    FileUtils.write(file, TXT)
+    assert(FileUtils.read(file) == TXT)
+    file.delete()
+  }
+
+  it should "read/write bytes array correctly" in {
+    val file = File.createTempFile("fileutilspec", ".test")
+    val bytes = TXT.toCharArray.map(_.toByte)
+    FileUtils.writeByteArrayToFile(file, bytes)
+    util.Arrays.equals(bytes, FileUtils.readFileToByteArray(file))
+    file.delete()
+  }
+
+  it should "create directory and all parents" in {
+    val temp = Files.createTempDir()
+    val parent = new File(temp, "sub1")
+    val child = new File(parent, "sub2" + File.separator)
+    FileUtils.forceMkdir(child)
+    assert(child.exists())
+    assert(child.isDirectory)
+    child.delete()
+    parent.delete()
+    temp.delete()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala b/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala
new file mode 100644
index 0000000..2b6df78
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.util.Graph.{Node, Path}
+
+class GraphSpec extends PropSpec with PropertyChecks with Matchers {
+
+  case class Vertex(id: Int)
+  case class Edge(from: Int, to: Int)
+
+  val vertexCount = 100
+
+  property("Graph with no edges should be built correctly") {
+    val vertexSet = Set("A", "B", "C")
+    val graph = Graph(vertexSet.toSeq.map(Node): _*)
+    graph.vertices.toSet shouldBe vertexSet
+  }
+
+  property("Graph with vertices and edges should be built correctly") {
+    val vertices: Array[Vertex] = 0.until(vertexCount).map(Vertex).toArray
+    val genEdge = for {
+      from <- Gen.chooseNum[Int](0, vertexCount - 1)
+      to <- Gen.chooseNum[Int](0, vertexCount - 1)
+    } yield Edge(from, to)
+
+    var graphElements = Array.empty[Path[Vertex, _ <: Edge]]
+    val outDegrees = new Array[Int](vertices.length)
+    val outGoingEdges = vertices.map(_ => Set.empty[(Vertex, Edge, Vertex)])
+    val edgesOf = vertices.map(_ => Set.empty[(Vertex, Edge, Vertex)])
+    vertices.foreach { v =>
+      graphElements :+= Node(v)
+    }
+
+    forAll(genEdge) {
+      e: Edge =>
+        val from = vertices(e.from)
+        val to = vertices(e.to)
+        graphElements :+= from ~ e ~> to
+        outDegrees(e.from) += 1
+
+        val nodeEdgeNode = (from, e, to)
+        outGoingEdges(e.from) += nodeEdgeNode
+
+        edgesOf(e.from) += nodeEdgeNode
+        edgesOf(e.to) += nodeEdgeNode
+    }
+
+    val graph: Graph[Vertex, Edge] = Graph(graphElements: _*)
+    graph.vertices should contain theSameElementsAs vertices
+
+    0.until(vertices.size).foreach { i =>
+      val v = vertices(i)
+      graph.outgoingEdgesOf(v) should contain theSameElementsAs outGoingEdges(i)
+      graph.edgesOf(v).sortBy(_._1.id)
+      graph.edgesOf(v) should contain theSameElementsAs edgesOf(i)
+    }
+  }
+
+  property("Check empty graph") {
+    val graph = Graph.empty[String, String]
+    assert(graph.isEmpty)
+  }
+
+  property("check level map for a graph") {
+    val graph = Graph.empty[String, String]
+
+    val defaultEdge = "edge"
+
+    graph.addVertex("A")
+    graph.addVertex("B")
+    graph.addVertex("C")
+
+    graph.addEdge("A", defaultEdge, "B")
+    graph.addEdge("B", defaultEdge, "C")
+    graph.addEdge("A", defaultEdge, "C")
+
+    graph.addVertex("D")
+    graph.addVertex("E")
+    graph.addVertex("F")
+
+    graph.addEdge("D", defaultEdge, "E")
+    graph.addEdge("E", defaultEdge, "F")
+    graph.addEdge("D", defaultEdge, "F")
+
+    graph.addEdge("C", defaultEdge, "E")
+
+    val levelMap = graph.vertexHierarchyLevelMap()
+
+    // Check whether the rule holds: : if vertex A -> B, then level(A) < level(B)
+    levelMap("A") < levelMap("B")
+    levelMap("A") < levelMap("C")
+    levelMap("B") < levelMap("C")
+
+    levelMap("D") < levelMap("E")
+    levelMap("D") < levelMap("F")
+    levelMap("E") < levelMap("F")
+
+    levelMap("C") < levelMap("F")
+  }
+
+  property("copy should return a immutalbe new Graph") {
+    val graph = Graph.empty[String, String]
+    val defaultEdge = "edge"
+    graph.addVertex("A")
+    graph.addVertex("B")
+    graph.addEdge("A", defaultEdge, "B")
+
+    val newGraph = graph.copy
+    newGraph.addVertex("C")
+
+    assert(!graph.vertices.toSet.contains("C"), "Graph should be immutable")
+  }
+
+  property("subGraph should return a sub-graph for certain vertex") {
+    val graph = Graph.empty[String, String]
+    val defaultEdge = "edge"
+    graph.addVertex("A")
+    graph.addVertex("B")
+    graph.addVertex("C")
+    graph.addEdge("A", defaultEdge, "B")
+    graph.addEdge("B", defaultEdge, "C")
+    graph.addEdge("A", defaultEdge, "C")
+
+    val subGraph = graph.subGraph("C")
+    assert(subGraph.outDegreeOf("A") != graph.outDegreeOf("A"))
+  }
+
+  property("replaceVertex should hold all upstream downstream relation for a vertex") {
+    val graph = Graph.empty[String, String]
+    val defaultEdge = "edge"
+    graph.addVertex("A")
+    graph.addVertex("B")
+    graph.addVertex("C")
+    graph.addEdge("A", defaultEdge, "B")
+    graph.addEdge("B", defaultEdge, "C")
+
+    val newGraph = graph.copy.replaceVertex("B", "D")
+    assert(newGraph.inDegreeOf("D") == graph.inDegreeOf("B"))
+    assert(newGraph.outDegreeOf("D") == graph.outDegreeOf("B"))
+  }
+
+  property("Cycle detecting should work properly") {
+    val graph = Graph.empty[String, String]
+    val defaultEdge = "edge"
+    graph.addVertex("A")
+    graph.addVertex("B")
+    graph.addVertex("C")
+    graph.addEdge("A", defaultEdge, "B")
+    graph.addEdge("B", defaultEdge, "C")
+
+    assert(!graph.hasCycle())
+
+    graph.addEdge("C", defaultEdge, "B")
+    assert(graph.hasCycle())
+
+    graph.addEdge("C", defaultEdge, "A")
+    assert(graph.hasCycle())
+  }
+
+  property("topologicalOrderIterator and topologicalOrderWithCirclesIterator method should " +
+    "return equal order of graph with no circle") {
+    val graph = Graph(1 ~> 2 ~> 3, 4 ~> 2, 2 ~> 5)
+    val topoNoCircles = graph.topologicalOrderIterator
+    val topoWithCircles = graph.topologicalOrderWithCirclesIterator
+
+    assert(topoNoCircles.zip(topoWithCircles).forall(x => x._1 == x._2))
+  }
+
+  property("Topological sort of graph with circles should work properly") {
+    val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7,
+      4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10)
+    val topoWithCircles = graph.topologicalOrderWithCirclesIterator
+    val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6, 5, 7, 10, 9)
+
+    assert(trueTopoWithCircles.zip(topoWithCircles).forall(x => x._1 == x._2))
+  }
+
+  property("Duplicated edges detecting should work properly") {
+    val graph = Graph.empty[String, String]
+    val defaultEdge = "edge"
+    val anotherEdge = "edge2"
+    graph.addVertex("A")
+    graph.addVertex("B")
+    graph.addEdge("A", defaultEdge, "B")
+
+    assert(!graph.hasDuplicatedEdge())
+
+    graph.addEdge("A", anotherEdge, "B")
+
+    assert(graph.hasDuplicatedEdge())
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/TimeOutSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/TimeOutSchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/util/TimeOutSchedulerSpec.scala
new file mode 100644
index 0000000..6f18497
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/util/TimeOutSchedulerSpec.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import scala.concurrent.duration._
+
+import akka.actor._
+import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.TestUtil
+
+class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
+  with WordSpecLike with Matchers with BeforeAndAfterAll {
+
+  def this() = this(ActorSystem("WorkerSpec", TestUtil.DEFAULT_CONFIG))
+  val mockActor = TestProbe()
+
+  override def afterAll {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  "The TimeOutScheduler" should {
+    "handle the time out event" in {
+      val testActorRef = TestActorRef(Props(classOf[TestActor], mockActor.ref))
+      val testActor = testActorRef.underlyingActor.asInstanceOf[TestActor]
+      testActor.sendMsgToIgnore()
+      mockActor.expectMsg(30.seconds, MessageTimeOut)
+    }
+  }
+}
+
+case object Echo
+case object MessageTimeOut
+
+class TestActor(mock: ActorRef) extends Actor with TimeOutScheduler {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  val target = context.actorOf(Props(classOf[EchoActor]))
+
+  override def receive: Receive = {
+    case _ =>
+  }
+
+  def sendMsgToIgnore(): Unit = {
+    sendMsgWithTimeOutCallBack(target, Echo, 2000, sendMsgTimeOut())
+  }
+
+  private def sendMsgTimeOut(): Unit = {
+    mock ! MessageTimeOut
+  }
+}
+
+class EchoActor extends Actor {
+  override def receive: Receive = {
+    case _ =>
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/util/UtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/UtilSpec.scala b/core/src/test/scala/org/apache/gearpump/util/UtilSpec.scala
new file mode 100644
index 0000000..6e6cec1
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/util/UtilSpec.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Util._
+
+class UtilSpec extends FlatSpec with Matchers with MockitoSugar {
+  it should "work" in {
+
+    assert(findFreePort().isSuccess)
+
+    assert(randInt() != randInt())
+
+    val hosts = parseHostList("host1:1,host2:2")
+    assert(hosts(1) == HostPort("host2", 2))
+
+    assert(Util.getCurrentClassPath.length > 0)
+  }
+
+  it should "check application name properly" in {
+    assert(Util.validApplicationName("_application_1"))
+    assert(Util.validApplicationName("application_1_"))
+    assert(!Util.validApplicationName("0_application_1"))
+    assert(!Util.validApplicationName("_applicat&&ion_1"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git a/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService b/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
deleted file mode 100644
index d226af9..0000000
--- a/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
+++ /dev/null
@@ -1,20 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-io.gearpump.jarstore.local.LocalJarStoreService
-io.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git a/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
new file mode 100644
index 0000000..bf37316
--- /dev/null
+++ b/daemon/src/main/resources/META-INF/services/org.apache.gearpump.jarstore.JarStoreService
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.gearpump.jarstore.local.LocalJarStoreService
+org.apache.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
deleted file mode 100644
index 420f1b6..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.cluster
-
-import akka.actor.ActorRef
-
-import io.gearpump.cluster.master.Master.MasterInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.worker.WorkerId
-
-/**
- * Cluster Bootup Flow
- */
-object WorkerToMaster {
-
-  /** When an worker is started, it sends RegisterNewWorker */
-  case object RegisterNewWorker
-
-  /** When worker lose connection with master, it tries to register itself again with old Id. */
-  case class RegisterWorker(workerId: WorkerId)
-
-  /** Worker is responsible to broadcast its current status to master */
-  case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
-}
-
-object MasterToWorker {
-
-  /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
-  case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
-
-  /** Worker have not received reply from master */
-  case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
-
-  /** Master is synced with worker on resource slots managed by current worker */
-  case object UpdateResourceSucceed
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
deleted file mode 100644
index 53da645..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.embedded
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import com.typesafe.config.{Config, ConfigValueFactory}
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.master.{Master => MasterActor}
-import io.gearpump.cluster.worker.{Worker => WorkerActor}
-import io.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
-import io.gearpump.util.{LogUtil, Util}
-
-/**
- * Create a in-process cluster with single worker
- */
-class EmbeddedCluster(inputConfig: Config) {
-
-  private val workerCount: Int = 1
-  private var _master: ActorRef = null
-  private var _system: ActorSystem = null
-  private var _config: Config = null
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  def start(): Unit = {
-    val port = Util.findFreePort().get
-    val akkaConf = getConfig(inputConfig, port)
-    _config = akkaConf
-    val system = ActorSystem(MASTER, akkaConf)
-
-    val master = system.actorOf(Props[MasterActor], MASTER)
-
-    0.until(workerCount).foreach { id =>
-      system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
-    }
-    this._master = master
-    this._system = system
-
-    LOG.info("=================================")
-    LOG.info("Local Cluster is started at: ")
-    LOG.info(s"                 127.0.0.1:$port")
-    LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
-  }
-
-  private def getConfig(inputConfig: Config, port: Int): Config = {
-    val config = inputConfig.
-      withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
-      withValue(GEARPUMP_CLUSTER_MASTERS,
-        ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
-      withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
-        ConfigValueFactory.fromAnyRef(true)).
-      withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
-      withValue("akka.actor.provider",
-        ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
-    config
-  }
-
-  def newClientContext: ClientContext = {
-    ClientContext(_config, _system, _master)
-  }
-
-  def stop(): Unit = {
-    _system.stop(_master)
-    _system.terminate()
-    Await.result(_system.whenTerminated, Duration.Inf)
-  }
-}
-
-object EmbeddedCluster {
-  def apply(): EmbeddedCluster = {
-    new EmbeddedCluster(ClusterConfig.master())
-  }
-}
\ No newline at end of file