You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:51 UTC

[41/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
deleted file mode 100644
index 00bd408..0000000
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor._
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.TestProbeUtil._
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster._
-import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
-import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironmentSpec.TestAppMasterEnv
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.StartExecutorSystems
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
-
-class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  implicit var system: ActorSystem = null
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  "MasterWithExecutorSystemProvider" should
-    "forward request StartExecutorSystem to ExecutorSystemProvider" in {
-
-    val client = TestProbe()
-    val master = TestProbe()
-    val provider = TestProbe()
-    val providerProps: Props = provider
-    val masterEnhanced = system.actorOf(Props(
-      new MasterWithExecutorSystemProvider(master.ref, providerProps)))
-
-    val start = StartExecutorSystems(null, null)
-    client.send(masterEnhanced, start)
-    provider.expectMsg(start)
-
-    val anyOtherMessage = "any other message"
-    client.send(masterEnhanced, anyOtherMessage)
-    master.expectMsg(anyOtherMessage)
-    system.stop(masterEnhanced)
-  }
-
-  "LazyStartAppMaster" should "forward command to appmaster when app master started" in {
-
-    val appMaster = TestProbe()
-    val appMasterProps: Props = appMaster
-    val lazyAppMaster = system.actorOf(Props(new LazyStartAppMaster(appId = 0, appMasterProps)))
-    val msg = "Some"
-    lazyAppMaster ! msg
-    lazyAppMaster ! StartAppMaster
-    appMaster.expectMsg(msg)
-
-    system.stop(appMaster.ref)
-    val client = TestProbe()
-    client.watch(lazyAppMaster)
-    client.expectTerminated(lazyAppMaster)
-  }
-
-  "AppMasterRuntimeEnvironment" should "start appMaster when master is connected" in {
-    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
-      setupAppMasterRuntimeEnv()
-
-    masterConnectionKeeper.send(runtimeEnv, MasterConnected)
-    appMaster.expectMsg(StartAppMaster)
-  }
-
-  "AppMasterRuntimeEnvironment" should "shutdown itself when master is stopped" in {
-
-    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
-      setupAppMasterRuntimeEnv()
-
-    masterConnectionKeeper.send(runtimeEnv, MasterStopped)
-    val client = TestProbe()
-    client.watch(runtimeEnv)
-    client.expectTerminated(runtimeEnv)
-  }
-
-  "AppMasterRuntimeEnvironment" should "shutdown itself when appMaster is stopped" in {
-
-    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
-      setupAppMasterRuntimeEnv()
-
-    val client = TestProbe()
-    client.watch(runtimeEnv)
-    system.stop(appMaster.ref)
-    client.expectTerminated(runtimeEnv)
-  }
-
-  private def setupAppMasterRuntimeEnv(): TestAppMasterEnv = {
-    val appContext = AppMasterContext(0, null, null, null, null, null, null)
-    val app = AppDescription("app", "AppMasterClass", null, null)
-    val master = TestProbe()
-    val masterFactory = (_: AppId, _: MasterActorRef) => toProps(master)
-    val appMaster = TestProbe()
-    val appMasterFactory = (_: AppMasterContext, _: AppDescription) => toProps(appMaster)
-    val masterConnectionKeeper = TestProbe()
-    val masterConnectionKeeperFactory =
-      (_: MasterActorRef, _: RegisterAppMaster, _: ListenerActorRef) =>
-        toProps(masterConnectionKeeper)
-
-    val runtimeEnv = system.actorOf(
-      Props(new AppMasterRuntimeEnvironment(
-        appContext, app, List(master.ref.path), masterFactory,
-        appMasterFactory, masterConnectionKeeperFactory)))
-
-    TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv)
-  }
-}
-
-object AppMasterRuntimeEnvironmentSpec {
-
-  case class TestAppMasterEnv(
-      master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe,
-      appMasterRuntimeEnv: ActorRef)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
deleted file mode 100644
index 3595960..0000000
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigValueFactory
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.Session
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
-import io.gearpump.util.Constants
-
-class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  implicit var system: ActorSystem = null
-  val workerId: WorkerId = WorkerId(0, 0L)
-  val appId = 0
-  val executorId = 0
-  val url = "akka.tcp://worker@127.0.0.1:3000"
-  val session = Session(null, null)
-  val launchExecutorSystemTimeout = 3000
-  val activeConfig = TestUtil.DEFAULT_CONFIG.
-    withValue(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS,
-      ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout))
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", activeConfig)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "report success when worker launch the system successfully" in {
-    val worker = TestProbe()
-    val client = TestProbe()
-
-    val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
-    client.watch(launcher)
-    client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1)))
-
-    worker.expectMsgType[LaunchExecutor]
-    worker.reply(RegisterActorSystem(url))
-
-    worker.expectMsgType[ActorSystemRegistered]
-
-    client.expectMsgType[LaunchExecutorSystemSuccess]
-    client.expectTerminated(launcher)
-  }
-
-  it should "report failure when worker refuse to launch the system explicitly" in {
-    val worker = TestProbe()
-    val client = TestProbe()
-
-    val resource = Resource(4)
-
-    val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
-    client.watch(launcher)
-    client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, resource))
-
-    worker.expectMsgType[LaunchExecutor]
-    worker.reply(ExecutorLaunchRejected())
-
-    client.expectMsg(LaunchExecutorSystemRejected(resource, null, session))
-    client.expectTerminated(launcher)
-  }
-
-  it should "report timeout when trying to start a executor system on worker, " +
-    "and worker doesn't response" in {
-    val client = TestProbe()
-    val worker = TestProbe()
-    val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
-    client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1)))
-    client.watch(launcher)
-    val waitFor = launchExecutorSystemTimeout + 10000
-    client.expectMsgType[LaunchExecutorSystemTimeout](waitFor.milliseconds)
-    client.expectTerminated(launcher, waitFor.milliseconds)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
deleted file mode 100644
index c2f6ee8..0000000
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
-import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._
-import io.gearpump.cluster.appmaster.ExecutorSystemSchedulerSpec.{ExecutorSystemLauncherStarted, MockExecutorSystemLauncher}
-import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppJar, TestUtil}
-import io.gearpump.jarstore.FilePath
-
-class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-  val appId = 0
-  val workerId = WorkerId(0, 0L)
-  val resource = Resource(1)
-  val resourceRequest = ResourceRequest(resource, WorkerId.unspecified)
-  val mockJar = AppJar("for_test", FilePath("PATH"))
-  val emptyJvmConfig = ExecutorSystemJvmConfig(Array.empty, Array.empty, Some(mockJar), "")
-  val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig)
-
-  implicit var system: ActorSystem = null
-  var worker: TestProbe = null
-  var workerInfo: WorkerInfo = null
-  var masterProxy: TestProbe = null
-  var launcher: TestProbe = null
-  var client: TestProbe = null
-
-  override def beforeEach(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-    worker = TestProbe()
-    workerInfo = WorkerInfo(workerId, worker.ref)
-    masterProxy = TestProbe()
-    launcher = TestProbe()
-    client = TestProbe()
-
-    val scheduler = system.actorOf(
-      Props(new ExecutorSystemScheduler(appId, masterProxy.ref, (appId: Int, session: Session) => {
-        Props(new MockExecutorSystemLauncher(launcher, session))
-      })))
-
-    client.send(scheduler, start)
-    masterProxy.expectMsg(RequestResource(appId, resourceRequest))
-  }
-
-  override def afterEach(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  private def launcherStarted(launcher: TestProbe): Option[ExecutorSystemLauncherStarted] = {
-    val launcherStarted = launcher.receiveOne(15.seconds)
-
-    launcherStarted match {
-      case start: ExecutorSystemLauncherStarted => Some(start)
-      case x =>
-        assert(false, "ExecutorSystemLauncherStarted == false")
-        None
-    }
-  }
-
-  it should "schedule and launch an executor system on target worker successfully" in {
-
-    masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
-
-    val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
-
-    var systemId = 0
-    launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
-
-    val executorSystemProbe = TestProbe()
-    val executorSystem =
-      ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo)
-    launcher.reply(LaunchExecutorSystemSuccess(executorSystem, session))
-    client.expectMsg(ExecutorSystemStarted(executorSystem, Some(mockJar)))
-  }
-
-  it should "report failure when resource cannot be allocated" in {
-    client.expectMsg(30.seconds, StartExecutorSystemTimeout)
-  }
-
-  it should "schedule new resouce on new worker " +
-    "when target worker reject creating executor system" in {
-    masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
-    val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
-
-    var systemId = 0
-    launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
-    launcher.reply(LaunchExecutorSystemRejected(resource, "", session))
-    masterProxy.expectMsg(RequestResource(appId, resourceRequest))
-  }
-
-  it should "report failure when resource is allocated, but timeout " +
-    "when starting the executor system" in {
-    masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
-    val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
-
-    var systemId = 0
-    launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
-    launcher.reply(LaunchExecutorSystemTimeout(session))
-    client.expectMsg(StartExecutorSystemTimeout)
-  }
-}
-
-object ExecutorSystemSchedulerSpec {
-  class MockExecutorSystemLauncher(forwardTo: TestProbe, session: Session) extends Actor {
-    forwardTo.ref ! ExecutorSystemLauncherStarted(session)
-
-    def receive: Receive = {
-      case msg => forwardTo.ref forward msg
-    }
-  }
-
-  case class ExecutorSystemLauncherStarted(session: Session)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
deleted file mode 100644
index 3272b99..0000000
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.appmaster
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, _}
-import io.gearpump.cluster.appmaster.MasterConnectionKeeperSpec.ConnectionKeeperTestEnv
-import io.gearpump.cluster.master.MasterProxy.WatchMaster
-
-class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  implicit var system: ActorSystem = null
-  val register = RegisterAppMaster(null, null)
-  val appId = 0
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  private def startMasterConnectionKeeper: ConnectionKeeperTestEnv = {
-    val statusChangeSubscriber = TestProbe()
-    val master = TestProbe()
-
-    val keeper = system.actorOf(Props(
-      new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref)))
-    statusChangeSubscriber.watch(keeper)
-
-    master.expectMsgType[WatchMaster]
-
-    // Master is alive, response to RegisterAppMaster
-    master.expectMsgType[RegisterAppMaster]
-    master.reply(AppMasterRegistered(appId))
-
-    // Notify listener that master is alive
-    statusChangeSubscriber.expectMsg(MasterConnected)
-    ConnectionKeeperTestEnv(master, keeper, statusChangeSubscriber)
-  }
-
-  it should "start correctly and notify listener that master is alive" in {
-    startMasterConnectionKeeper
-  }
-
-  it should "re-register the appmaster when master is restarted" in {
-    import io.gearpump.cluster.master.MasterProxy.MasterRestarted
-    val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper
-
-    // Master is restarted
-    master.send(keeper, MasterRestarted)
-    master.expectMsgType[RegisterAppMaster]
-    master.reply(AppMasterRegistered(appId))
-    masterChangeListener.expectMsg(MasterConnected)
-
-    // Recovery from Master restart is transparent to listener
-    masterChangeListener.expectNoMsg()
-  }
-
-  it should "notify listener and then shutdown itself when master is dead" in {
-    val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper
-
-    // Master is dead
-    master.send(keeper, MasterStopped)
-
-    // Keeper should tell the listener that master is stopped before shutting down itself
-    masterChangeListener.expectMsg(MasterStopped)
-    masterChangeListener.expectTerminated(keeper)
-  }
-
-  it should "mark the master as dead when timeout" in {
-    val statusChangeSubscriber = TestProbe()
-    val master = TestProbe()
-
-    // MasterConnectionKeeper register itself to master by sending RegisterAppMaster
-    val keeper = system.actorOf(Props(new MasterConnectionKeeper(register,
-      master.ref, statusChangeSubscriber.ref)))
-
-    // Master doesn't reply to keeper,
-    statusChangeSubscriber.watch(keeper)
-
-    // Timeout, keeper notify listener, and then make suicide
-    statusChangeSubscriber.expectMsg(60.seconds, MasterStopped)
-    statusChangeSubscriber.expectTerminated(keeper, 60.seconds)
-  }
-}
-
-object MasterConnectionKeeperSpec {
-  case class ConnectionKeeperTestEnv(
-      master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe)
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala b/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
deleted file mode 100644
index 7544f9a..0000000
--- a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.main
-
-import org.scalatest.{FlatSpec, Matchers}
-
-class ArgumentParserSpec extends FlatSpec with Matchers {
-  it should "parse arguments correctly" in {
-
-    val parser = new ArgumentsParser {
-      override val options = Array(
-        "flag" -> CLIOption[Any]("", required = true),
-        "opt1" -> CLIOption[Any]("", required = true),
-        "opt2" -> CLIOption[Any]("", required = true))
-    }
-
-    val result = parser.parse(Array("-flag", "-opt1", "1", "-opt2", "2", "arg1", "arg2"))
-    assert(result.getBoolean("flag"))
-    assert(result.getInt("opt1") == 1)
-    assert(result.getString("opt1") == "1")
-    assert(result.getInt("opt2") == 2)
-
-    assert(result.remainArgs(0) == "arg1")
-    assert(result.remainArgs(1) == "arg2")
-  }
-
-  it should "handle interleaved options and remain args" in {
-
-    val parser = new ArgumentsParser {
-      override val options = Array(
-        "opt1" -> CLIOption[Any]("", required = true))
-    }
-
-    val result = parser.parse(Array("-opt1", "1", "xx.MainClass", "-opt2", "2"))
-    assert(result.getInt("opt1") == 1)
-
-    assert(result.remainArgs.length == 3)
-
-    intercept[Exception] {
-      parser.parse(Array("-opt2"))
-    }
-
-    intercept[Exception] {
-      parser.parse(Array("-opt2", "2", "-opt1"))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
deleted file mode 100644
index 2a8dba1..0000000
--- a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import scala.util.Success
-
-import akka.actor._
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
-import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import io.gearpump.util.ActorSystemBooter._
-
-class AppMasterLauncherSpec extends FlatSpec with Matchers
-  with BeforeAndAfterEach with MasterHarness {
-
-  override def config: Config = TestUtil.DEFAULT_CONFIG
-
-  val appId = 1
-  val executorId = 2
-  var master: TestProbe = null
-  var client: TestProbe = null
-  var worker: TestProbe = null
-  var watcher: TestProbe = null
-  var appMasterLauncher: ActorRef = null
-
-  override def beforeEach(): Unit = {
-    startActorSystem()
-    master = createMockMaster()
-    client = TestProbe()(getActorSystem)
-    worker = TestProbe()(getActorSystem)
-    watcher = TestProbe()(getActorSystem)
-    appMasterLauncher = getActorSystem.actorOf(AppMasterLauncher.props(appId, executorId,
-      TestUtil.dummyApp, None, "username", master.ref, Some(client.ref)))
-    watcher watch appMasterLauncher
-    master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
-    val resource = ResourceAllocated(
-      Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
-    master.reply(resource)
-    worker.expectMsgType[LaunchExecutor]
-  }
-
-  override def afterEach(): Unit = {
-    shutdownActorSystem()
-  }
-
-  "AppMasterLauncher" should "launch appmaster correctly" in {
-    worker.reply(RegisterActorSystem("systempath"))
-    worker.expectMsgType[ActorSystemRegistered]
-
-    worker.expectMsgType[CreateActor]
-    worker.reply(ActorCreated(master.ref, "appmaster"))
-
-    client.expectMsg(SubmitApplicationResult(Success(appId)))
-    watcher.expectTerminated(appMasterLauncher)
-  }
-
-  "AppMasterLauncher" should "reallocate resource if executor launch rejected" in {
-    worker.reply(ExecutorLaunchRejected(""))
-    master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
-
-    val resource = ResourceAllocated(
-      Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
-    master.reply(resource)
-    worker.expectMsgType[LaunchExecutor]
-
-    worker.reply(RegisterActorSystem("systempath"))
-    worker.expectMsgType[ActorSystemRegistered]
-
-    worker.expectMsgType[CreateActor]
-    worker.reply(CreateActorFailed("", new Exception))
-    worker.expectMsgType[ShutdownExecutor]
-    assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
-    watcher.expectTerminated(appMasterLauncher)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
deleted file mode 100644
index 99cfc37..0000000
--- a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import io.gearpump.cluster.appmaster.ApplicationState
-
-class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
-  "ApplicationState" should "check equal with respect to only appId and attemptId" in {
-    val stateA = ApplicationState(0, "application0", 0, null, null, null, "A")
-    val stateB = ApplicationState(0, "application0", 0, null, null, null, "B")
-    val stateC = ApplicationState(0, "application1", 1, null, null, null, "A")
-
-    assert(stateA == stateB)
-    assert(stateA.hashCode == stateB.hashCode)
-    assert(stateA != stateC)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
deleted file mode 100644
index b007120..0000000
--- a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-class MasterProxySpec {
-
-  // Master proxy retries multiple times to find the master
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
deleted file mode 100644
index e4122da..0000000
--- a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.master
-
-class MasterSpec {
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
deleted file mode 100644
index 3b3265f..0000000
--- a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.metrics
-
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
-
-class MetricsSpec extends FlatSpec with Matchers with MockitoSugar {
-
-  "Counter" should "handle sampleRate == 1" in {
-
-    val mockBaseCounter = mock[CodaHaleCounter]
-
-    val counter = new Counter("c", mockBaseCounter)
-
-    counter.inc()
-    counter.inc()
-
-    verify(mockBaseCounter, times(2)).inc(1)
-  }
-
-  "Counter" should "handle sampleRate == 3" in {
-
-    val mockBaseCounter = mock[CodaHaleCounter]
-
-    val counter = new Counter("c", mockBaseCounter, 3)
-
-    counter.inc(1)
-    counter.inc(1)
-    counter.inc(1)
-    counter.inc(1)
-    counter.inc(1)
-    counter.inc(1)
-
-    verify(mockBaseCounter, times(2)).inc(3)
-  }
-
-  "Histogram" should "handle sampleRate == 1" in {
-
-    val mockBaseHistogram = mock[CodaHaleHistogram]
-
-    val histogram = new Histogram("h", mockBaseHistogram)
-
-    histogram.update(3)
-    histogram.update(7)
-    histogram.update(5)
-    histogram.update(9)
-
-    verify(mockBaseHistogram, times(4)).update(anyLong())
-  }
-
-  "Histogram" should "handle sampleRate > 1" in {
-
-    val mockBaseHistogram = mock[CodaHaleHistogram]
-
-    val histogram = new Histogram("h", mockBaseHistogram, 2)
-
-    histogram.update(3)
-    histogram.update(4)
-    histogram.update(5)
-    histogram.update(6)
-
-    verify(mockBaseHistogram, times(1)).update(4L)
-    verify(mockBaseHistogram, times(1)).update(6L)
-  }
-
-  "Meter" should "handle sampleRate == 1" in {
-
-    val mockBaseMeter = mock[CodaHaleMeter]
-
-    val meter = new Meter("m", mockBaseMeter)
-
-    meter.mark()
-    meter.mark(3)
-
-    verify(mockBaseMeter, times(1)).mark(1L)
-    verify(mockBaseMeter, times(1)).mark(3L)
-  }
-
-  "Meter" should "handle sampleRate > 1" in {
-
-    val mockBaseMeter = mock[CodaHaleMeter]
-
-    val meter = new Meter("m", mockBaseMeter, 2)
-
-    meter.mark(1)
-    meter.mark(3)
-
-    meter.mark(5)
-    meter.mark(7)
-
-    verify(mockBaseMeter, times(1)).mark(4L)
-    verify(mockBaseMeter, times(1)).mark(12L)
-  }
-
-  "Metrics" should "have a name" in {
-    val metrics = new Metrics(3)
-    assert(metrics.counter("counter").name == "counter")
-    assert(metrics.histogram("histogram").name == "histogram")
-    assert(metrics.meter("meter").name == "meter")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
deleted file mode 100644
index 9509d94..0000000
--- a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.partitioner
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.Message
-
-class PartitionerSpec extends FlatSpec with Matchers {
-  val NUM = 10
-
-  "HashPartitioner" should "hash same key to same slots" in {
-    val partitioner = new HashPartitioner
-
-    val data = new Array[Byte](1000)
-    (new java.util.Random()).nextBytes(data)
-    val msg = Message(data)
-
-    val partition = partitioner.getPartition(msg, NUM)
-    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
-    assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" +
-      "consistent result")
-  }
-
-  "ShufflePartitioner" should "hash same key randomly" in {
-    val partitioner = new ShufflePartitioner
-
-    val data = new Array[Byte](1000)
-    (new java.util.Random()).nextBytes(data)
-    val msg = Message(data)
-
-    val partition = partitioner.getPartition(msg, NUM)
-    assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0")
-
-    assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" +
-      "consistent result")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala b/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
deleted file mode 100644
index c38a555..0000000
--- a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.security
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-
-class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers {
-  it should "authenticate correctly" in {
-    val config = TestUtil.UI_CONFIG
-    implicit val system = ActorSystem("ConfigFileBasedAuthenticatorSpec", config)
-    implicit val ec = system.dispatcher
-    val timeout = 30.seconds
-
-    val authenticator = new ConfigFileBasedAuthenticator(config)
-    val guest = Await.result(authenticator.authenticate("guest", "guest", ec), timeout)
-    val admin = Await.result(authenticator.authenticate("admin", "admin", ec), timeout)
-
-    val nonexist = Await.result(authenticator.authenticate("nonexist", "nonexist", ec), timeout)
-
-    val failedGuest = Await.result(authenticator.authenticate("guest", "wrong", ec), timeout)
-    val failedAdmin = Await.result(authenticator.authenticate("admin", "wrong", ec), timeout)
-
-    assert(guest == Authenticator.Guest)
-    assert(admin == Authenticator.Admin)
-    assert(nonexist == Authenticator.UnAuthenticated)
-    assert(failedGuest == Authenticator.UnAuthenticated)
-    assert(failedAdmin == Authenticator.UnAuthenticated)
-
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala b/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
deleted file mode 100644
index 4a3963b..0000000
--- a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.security
-
-import org.scalatest.{FlatSpec, Matchers}
-
-class PasswordUtilSpec extends FlatSpec with Matchers {
-
-  it should "verify the credential correctly" in {
-    val password = "password"
-
-    val digest1 = PasswordUtil.hash(password)
-    val digest2 = PasswordUtil.hash(password)
-
-    // Uses different salt each time, thus creating different hash.
-    assert(digest1 != digest2)
-
-    // Both are valid hash.
-    assert(PasswordUtil.verify(password, digest1))
-    assert(PasswordUtil.verify(password, digest2))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
deleted file mode 100644
index 3ed6ffa..0000000
--- a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.serializer
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.esotericsoftware.kryo.io.{Input, Output}
-import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
-import io.gearpump.serializer.SerializerSpec._
-
-class SerializerSpec extends FlatSpec with Matchers with MockitoSugar {
-  val config = ConfigFactory.empty.withValue("gearpump.serializers",
-    ConfigValueFactory.fromAnyRef(Map(classOf[ClassA].getName -> classOf[ClassASerializer].getName,
-      classOf[ClassB].getName -> classOf[ClassBSerializer].getName).asJava))
-
-  "GearpumpSerialization" should "register custom serializers" in {
-    val serialization = new GearpumpSerialization(config)
-    val kryo = new Kryo
-    serialization.customize(kryo)
-
-    val forB = kryo.getRegistration(classOf[ClassB])
-    assert(forB.getSerializer.isInstanceOf[ClassBSerializer])
-
-    val forA = kryo.getRegistration(classOf[ClassA])
-    assert(forA.getSerializer.isInstanceOf[ClassASerializer])
-  }
-
-  "FastKryoSerializer" should "serialize correctly" in {
-    val myConfig = config.withFallback(TestUtil.DEFAULT_CONFIG.withoutPath("gearpump.serializers"))
-    val system = ActorSystem("my", myConfig)
-
-    val serializer = new FastKryoSerializer(system.asInstanceOf[ExtendedActorSystem])
-
-    val bytes = serializer.serialize(new ClassA)
-    val anotherA = serializer.deserialize(bytes)
-
-    assert(anotherA.isInstanceOf[ClassA])
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
-
-object SerializerSpec {
-
-  class ClassA {}
-
-  class ClassASerializer extends KryoSerializer[ClassA] {
-    override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = {
-      output.writeString(classOf[ClassA].getName.toString)
-    }
-
-    override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = {
-      val className = input.readString()
-      Class.forName(className).newInstance().asInstanceOf[ClassA]
-    }
-  }
-
-  class ClassB {}
-
-  class ClassBSerializer extends KryoSerializer[ClassA] {
-    override def write(kryo: Kryo, output: Output, `object`: ClassA): Unit = {}
-
-    override def read(kryo: Kryo, input: Input, `type`: Class[ClassA]): ClassA = {
-      null
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala b/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
deleted file mode 100644
index 71b4218..0000000
--- a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport
-
-import java.io.{DataInput, DataOutput}
-
-import io.gearpump.transport.MockTransportSerializer.NettyMessage
-import io.gearpump.transport.netty.ITransportMessageSerializer
-
-class MockTransportSerializer extends ITransportMessageSerializer {
-  override def getLength(obj: scala.Any): Int = 4
-
-  override def serialize(dataOutput: DataOutput, transportMessage: scala.Any): Unit = {
-    transportMessage match {
-      case msg: NettyMessage =>
-        dataOutput.writeInt(msg.num)
-    }
-  }
-
-  override def deserialize(dataInput: DataInput, length: Int): AnyRef = {
-    NettyMessage(dataInput.readInt())
-  }
-}
-
-object MockTransportSerializer {
-  case class NettyMessage(num: Int)
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/transport/NettySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/transport/NettySpec.scala b/core/src/test/scala/io/gearpump/transport/NettySpec.scala
deleted file mode 100644
index 6caf357..0000000
--- a/core/src/test/scala/io/gearpump/transport/NettySpec.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.transport
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.testkit.TestProbe
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.transport.MockTransportSerializer.NettyMessage
-import io.gearpump.transport.netty.{Context, TaskMessage}
-import io.gearpump.util.Util
-
-class NettySpec extends FlatSpec with Matchers with MockitoSugar {
-
-  "Netty Transport" should "send and receive message correctly " in {
-    val conf = TestUtil.DEFAULT_CONFIG
-    val system = ActorSystem("transport", conf)
-    val context = new Context(system, conf)
-    val serverActor = TestProbe()(system)
-
-    val port = Util.findFreePort()
-
-    import system.dispatcher
-    system.scheduler.scheduleOnce(Duration(1, TimeUnit.SECONDS)) {
-      context.bind("server", new ActorLookupById {
-        override def lookupLocalActor(id: Long): Option[ActorRef] = Some(serverActor.ref)
-      }, false, port.get)
-    }
-    val client = context.connect(HostPort("127.0.0.1", port.get))
-
-    val data = NettyMessage(0)
-    val msg = new TaskMessage(0, 1, 2, data)
-    client ! msg
-    serverActor.expectMsg(15.seconds, data)
-
-    context.close()
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala b/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
deleted file mode 100644
index 13530ed..0000000
--- a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem, _}
-import io.gearpump.util.ActorSystemBooterSpec._
-
-class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
-
-  "ActorSystemBooter" should "report its address back" in {
-    val boot = bootSystem()
-    boot.prob.expectMsgType[RegisterActorSystem]
-    boot.shutdown()
-  }
-
-  "ActorSystemBooter" should "terminate itself when parent actor dies" in {
-    val boot = bootSystem()
-    boot.prob.expectMsgType[RegisterActorSystem]
-
-    val dummy = boot.host.actorOf(Props(classOf[Dummy]), "dummy")
-    boot.prob.reply(ActorSystemRegistered(boot.prob.ref))
-    boot.prob.reply(BindLifeCycle(dummy))
-    boot.host.stop(dummy)
-    val terminated = retry(5)(boot.bootedSystem.whenTerminated.isCompleted)
-    assert(terminated)
-    boot.shutdown()
-  }
-
-  "ActorSystemBooter" should "create new actor" in {
-    val boot = bootSystem()
-    boot.prob.expectMsgType[RegisterActorSystem]
-    boot.prob.reply(ActorSystemRegistered(boot.prob.ref))
-    boot.prob.reply(CreateActor(Props(classOf[AcceptThreeArguments], 1, 2, 3), "three"))
-    boot.prob.expectMsgType[ActorCreated]
-
-    boot.prob.reply(CreateActor(Props(classOf[AcceptZeroArguments]), "zero"))
-    boot.prob.expectMsgType[ActorCreated]
-
-    boot.shutdown()
-  }
-
-  private def bootSystem(): Boot = {
-    val booter = ActorSystemBooter(TestUtil.DEFAULT_CONFIG)
-
-    val system = ActorSystem("reportback", TestUtil.DEFAULT_CONFIG)
-
-    val receiver = TestProbe()(system)
-    val address = ActorUtil.getFullPath(system, receiver.ref.path)
-
-    val bootSystem = booter.boot("booter", address)
-
-    Boot(system, receiver, bootSystem)
-  }
-
-  case class Boot(host: ActorSystem, prob: TestProbe, bootedSystem: ActorSystem) {
-    def shutdown(): Unit = {
-      host.terminate()
-      bootedSystem.terminate()
-      Await.result(host.whenTerminated, Duration.Inf)
-      Await.result(bootedSystem.whenTerminated, Duration.Inf)
-    }
-  }
-
-  def retry(seconds: Int)(fn: => Boolean): Boolean = {
-    val result = fn
-    if (result) {
-      result
-    } else {
-      Thread.sleep(1000)
-      retry(seconds - 1)(fn)
-    }
-  }
-}
-
-object ActorSystemBooterSpec {
-  class Dummy extends Actor {
-    def receive: Receive = {
-      case _ =>
-    }
-  }
-
-  class AcceptZeroArguments extends Actor {
-    def receive: Receive = {
-      case _ =>
-    }
-  }
-
-  class AcceptThreeArguments(a: Int, b: Int, c: Int) extends Actor {
-    def receive: Receive = {
-      case _ =>
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala b/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
deleted file mode 100644
index 6ab5a2f..0000000
--- a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import org.scalatest.FlatSpec
-
-import io.gearpump.transport.HostPort
-
-class ActorUtilSpec extends FlatSpec {
-  "masterActorPath" should "construct the ActorPath from HostPort" in {
-    import io.gearpump.util.Constants.MASTER
-
-    val host = "127.0.0.1"
-    val port = 3000
-    val master = HostPort("127.0.0.1", 3000)
-    val masterPath = ActorUtil.getMasterActorPath(master)
-    assert(masterPath.address.port == Some(port))
-    assert(masterPath.address.system == MASTER)
-    assert(masterPath.address.host == Some(host))
-    assert(masterPath.address.protocol == "akka.tcp")
-    assert(masterPath.toStringWithoutAddress == s"/user/$MASTER")
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala b/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
deleted file mode 100644
index 0c798f3..0000000
--- a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.File
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.{ClusterConfig, ClusterConfigSource, UserConfig}
-
-class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar {
-  "Typesafe Cluster Configs" should "follow the override rules" in {
-
-    val conf =
-      """
-      gearpump {
-        gear = "gearpump"
-      }
-
-      gearpump-master {
-        conf = "master"
-      }
-      gearpump-worker {
-        conf = "worker"
-      }
-      conf = "base"
-      """
-
-    val file = File.createTempFile("test", ".conf")
-    FileUtils.write(file, conf)
-
-    val raw = ClusterConfig.load(ClusterConfigSource(file.toString))
-
-    assert(raw.master.getString("conf") == "master", "master > base")
-    assert(raw.worker.getString("conf") == "worker", "worker > base")
-    assert(raw.default.getString("conf") == "base", "application > base")
-
-    file.delete()
-  }
-
-  "ClusterConfigSource" should "return empty for non-exist files" in {
-    val source = ClusterConfigSource("non-exist")
-    var config = source.getConfig
-    assert(config.isEmpty)
-
-    val nullCheck = ClusterConfigSource(null)
-    config = nullCheck.getConfig
-    assert(config.isEmpty)
-  }
-
-  "User Config" should "work" in {
-
-    implicit val system = ActorSystem("forSerialization")
-
-    val map = Map[String, String]("key1" -> "1", "key2" -> "value2")
-
-    val user = new UserConfig(map)
-      .withLong("key3", 2L)
-      .withBoolean("key4", value = true)
-      .withFloat("key5", 3.14F)
-      .withDouble("key6", 2.718)
-
-    assert(user.getInt("key1").get == 1)
-    assert(user.getString("key1").get == "1")
-    assert(user.getLong("key3").get == 2L)
-    assert(user.getBoolean("key4").get == true)
-    assert(user.getFloat("key5").get == 3.14F)
-    assert(user.getDouble("key6").get == 2.718)
-
-    val data = new ConfigsSpec.Data(3)
-    assert(data == user.withValue("data", data).getValue[ConfigsSpec.Data]("data").get)
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
-
-object ConfigsSpec {
-  case class Data(value: Int)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
deleted file mode 100644
index 30e42c7..0000000
--- a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import java.io.File
-import java.util
-
-import org.scalatest.FlatSpec
-
-import io.gearpump.google.common.io.Files
-
-class FileUtilsSpec extends FlatSpec {
-  val TXT =
-    """
-      |This is a multiple line
-      |text
-      |
-    """.stripMargin
-
-  it should "read/write string correctly" in {
-    val file = File.createTempFile("fileutilspec", ".test")
-    FileUtils.write(file, TXT)
-    assert(FileUtils.read(file) == TXT)
-    file.delete()
-  }
-
-  it should "read/write bytes array correctly" in {
-    val file = File.createTempFile("fileutilspec", ".test")
-    val bytes = TXT.toCharArray.map(_.toByte)
-    FileUtils.writeByteArrayToFile(file, bytes)
-    util.Arrays.equals(bytes, FileUtils.readFileToByteArray(file))
-    file.delete()
-  }
-
-  it should "create directory and all parents" in {
-    val temp = Files.createTempDir()
-    val parent = new File(temp, "sub1")
-    val child = new File(parent, "sub2" + File.separator)
-    FileUtils.forceMkdir(child)
-    assert(child.exists())
-    assert(child.isDirectory)
-    child.delete()
-    parent.delete()
-    temp.delete()
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/GraphSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/GraphSpec.scala b/core/src/test/scala/io/gearpump/util/GraphSpec.scala
deleted file mode 100644
index 20eab6d..0000000
--- a/core/src/test/scala/io/gearpump/util/GraphSpec.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.util.Graph.{Node, Path}
-
-class GraphSpec extends PropSpec with PropertyChecks with Matchers {
-
-  case class Vertex(id: Int)
-  case class Edge(from: Int, to: Int)
-
-  val vertexCount = 100
-
-  property("Graph with no edges should be built correctly") {
-    val vertexSet = Set("A", "B", "C")
-    val graph = Graph(vertexSet.toSeq.map(Node): _*)
-    graph.vertices.toSet shouldBe vertexSet
-  }
-
-  property("Graph with vertices and edges should be built correctly") {
-    val vertices: Array[Vertex] = 0.until(vertexCount).map(Vertex).toArray
-    val genEdge = for {
-      from <- Gen.chooseNum[Int](0, vertexCount - 1)
-      to <- Gen.chooseNum[Int](0, vertexCount - 1)
-    } yield Edge(from, to)
-
-    var graphElements = Array.empty[Path[Vertex, _ <: Edge]]
-    val outDegrees = new Array[Int](vertices.length)
-    val outGoingEdges = vertices.map(_ => Set.empty[(Vertex, Edge, Vertex)])
-    val edgesOf = vertices.map(_ => Set.empty[(Vertex, Edge, Vertex)])
-    vertices.foreach { v =>
-      graphElements :+= Node(v)
-    }
-
-    forAll(genEdge) {
-      e: Edge =>
-        val from = vertices(e.from)
-        val to = vertices(e.to)
-        graphElements :+= from ~ e ~> to
-        outDegrees(e.from) += 1
-
-        val nodeEdgeNode = (from, e, to)
-        outGoingEdges(e.from) += nodeEdgeNode
-
-        edgesOf(e.from) += nodeEdgeNode
-        edgesOf(e.to) += nodeEdgeNode
-    }
-
-    val graph: Graph[Vertex, Edge] = Graph(graphElements: _*)
-    graph.vertices should contain theSameElementsAs vertices
-
-    0.until(vertices.size).foreach { i =>
-      val v = vertices(i)
-      graph.outgoingEdgesOf(v) should contain theSameElementsAs outGoingEdges(i)
-      graph.edgesOf(v).sortBy(_._1.id)
-      graph.edgesOf(v) should contain theSameElementsAs edgesOf(i)
-    }
-  }
-
-  property("Check empty graph") {
-    val graph = Graph.empty[String, String]
-    assert(graph.isEmpty)
-  }
-
-  property("check level map for a graph") {
-    val graph = Graph.empty[String, String]
-
-    val defaultEdge = "edge"
-
-    graph.addVertex("A")
-    graph.addVertex("B")
-    graph.addVertex("C")
-
-    graph.addEdge("A", defaultEdge, "B")
-    graph.addEdge("B", defaultEdge, "C")
-    graph.addEdge("A", defaultEdge, "C")
-
-    graph.addVertex("D")
-    graph.addVertex("E")
-    graph.addVertex("F")
-
-    graph.addEdge("D", defaultEdge, "E")
-    graph.addEdge("E", defaultEdge, "F")
-    graph.addEdge("D", defaultEdge, "F")
-
-    graph.addEdge("C", defaultEdge, "E")
-
-    val levelMap = graph.vertexHierarchyLevelMap()
-
-    // Check whether the rule holds: : if vertex A -> B, then level(A) < level(B)
-    levelMap("A") < levelMap("B")
-    levelMap("A") < levelMap("C")
-    levelMap("B") < levelMap("C")
-
-    levelMap("D") < levelMap("E")
-    levelMap("D") < levelMap("F")
-    levelMap("E") < levelMap("F")
-
-    levelMap("C") < levelMap("F")
-  }
-
-  property("copy should return a immutalbe new Graph") {
-    val graph = Graph.empty[String, String]
-    val defaultEdge = "edge"
-    graph.addVertex("A")
-    graph.addVertex("B")
-    graph.addEdge("A", defaultEdge, "B")
-
-    val newGraph = graph.copy
-    newGraph.addVertex("C")
-
-    assert(!graph.vertices.toSet.contains("C"), "Graph should be immutable")
-  }
-
-  property("subGraph should return a sub-graph for certain vertex") {
-    val graph = Graph.empty[String, String]
-    val defaultEdge = "edge"
-    graph.addVertex("A")
-    graph.addVertex("B")
-    graph.addVertex("C")
-    graph.addEdge("A", defaultEdge, "B")
-    graph.addEdge("B", defaultEdge, "C")
-    graph.addEdge("A", defaultEdge, "C")
-
-    val subGraph = graph.subGraph("C")
-    assert(subGraph.outDegreeOf("A") != graph.outDegreeOf("A"))
-  }
-
-  property("replaceVertex should hold all upstream downstream relation for a vertex") {
-    val graph = Graph.empty[String, String]
-    val defaultEdge = "edge"
-    graph.addVertex("A")
-    graph.addVertex("B")
-    graph.addVertex("C")
-    graph.addEdge("A", defaultEdge, "B")
-    graph.addEdge("B", defaultEdge, "C")
-
-    val newGraph = graph.copy.replaceVertex("B", "D")
-    assert(newGraph.inDegreeOf("D") == graph.inDegreeOf("B"))
-    assert(newGraph.outDegreeOf("D") == graph.outDegreeOf("B"))
-  }
-
-  property("Cycle detecting should work properly") {
-    val graph = Graph.empty[String, String]
-    val defaultEdge = "edge"
-    graph.addVertex("A")
-    graph.addVertex("B")
-    graph.addVertex("C")
-    graph.addEdge("A", defaultEdge, "B")
-    graph.addEdge("B", defaultEdge, "C")
-
-    assert(!graph.hasCycle())
-
-    graph.addEdge("C", defaultEdge, "B")
-    assert(graph.hasCycle())
-
-    graph.addEdge("C", defaultEdge, "A")
-    assert(graph.hasCycle())
-  }
-
-  property("topologicalOrderIterator and topologicalOrderWithCirclesIterator method should " +
-    "return equal order of graph with no circle") {
-    val graph = Graph(1 ~> 2 ~> 3, 4 ~> 2, 2 ~> 5)
-    val topoNoCircles = graph.topologicalOrderIterator
-    val topoWithCircles = graph.topologicalOrderWithCirclesIterator
-
-    assert(topoNoCircles.zip(topoWithCircles).forall(x => x._1 == x._2))
-  }
-
-  property("Topological sort of graph with circles should work properly") {
-    val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7,
-      4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10)
-    val topoWithCircles = graph.topologicalOrderWithCirclesIterator
-    val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6, 5, 7, 10, 9)
-
-    assert(trueTopoWithCircles.zip(topoWithCircles).forall(x => x._1 == x._2))
-  }
-
-  property("Duplicated edges detecting should work properly") {
-    val graph = Graph.empty[String, String]
-    val defaultEdge = "edge"
-    val anotherEdge = "edge2"
-    graph.addVertex("A")
-    graph.addVertex("B")
-    graph.addEdge("A", defaultEdge, "B")
-
-    assert(!graph.hasDuplicatedEdge())
-
-    graph.addEdge("A", anotherEdge, "B")
-
-    assert(graph.hasDuplicatedEdge())
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala b/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
deleted file mode 100644
index ef362ff..0000000
--- a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import scala.concurrent.duration._
-
-import akka.actor._
-import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.TestUtil
-
-class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
-  with WordSpecLike with Matchers with BeforeAndAfterAll {
-
-  def this() = this(ActorSystem("WorkerSpec", TestUtil.DEFAULT_CONFIG))
-  val mockActor = TestProbe()
-
-  override def afterAll {
-    TestKit.shutdownActorSystem(system)
-  }
-
-  "The TimeOutScheduler" should {
-    "handle the time out event" in {
-      val testActorRef = TestActorRef(Props(classOf[TestActor], mockActor.ref))
-      val testActor = testActorRef.underlyingActor.asInstanceOf[TestActor]
-      testActor.sendMsgToIgnore()
-      mockActor.expectMsg(30.seconds, MessageTimeOut)
-    }
-  }
-}
-
-case object Echo
-case object MessageTimeOut
-
-class TestActor(mock: ActorRef) extends Actor with TimeOutScheduler {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  val target = context.actorOf(Props(classOf[EchoActor]))
-
-  override def receive: Receive = {
-    case _ =>
-  }
-
-  def sendMsgToIgnore(): Unit = {
-    sendMsgWithTimeOutCallBack(target, Echo, 2000, sendMsgTimeOut())
-  }
-
-  private def sendMsgTimeOut(): Unit = {
-    mock ! MessageTimeOut
-  }
-}
-
-class EchoActor extends Actor {
-  override def receive: Receive = {
-    case _ =>
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/util/UtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/UtilSpec.scala b/core/src/test/scala/io/gearpump/util/UtilSpec.scala
deleted file mode 100644
index b5bde04..0000000
--- a/core/src/test/scala/io/gearpump/util/UtilSpec.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.util
-
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Util._
-
-class UtilSpec extends FlatSpec with Matchers with MockitoSugar {
-  it should "work" in {
-
-    assert(findFreePort().isSuccess)
-
-    assert(randInt() != randInt())
-
-    val hosts = parseHostList("host1:1,host2:2")
-    assert(hosts(1) == HostPort("host2", 2))
-
-    assert(Util.getCurrentClassPath.length > 0)
-  }
-
-  it should "check application name properly" in {
-    assert(Util.validApplicationName("_application_1"))
-    assert(Util.validApplicationName("application_1_"))
-    assert(!Util.validApplicationName("0_application_1"))
-    assert(!Util.validApplicationName("_applicat&&ion_1"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala b/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala
new file mode 100644
index 0000000..6ab7eed
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/TestProbeUtil.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump
+
+import scala.language.implicitConversions
+
+import akka.actor.{Actor, Props, Terminated}
+import akka.testkit.TestProbe
+
+object TestProbeUtil {
+  implicit def toProps(probe: TestProbe): Props = {
+    Props(new Actor {
+      val probeRef = probe.ref
+      context.watch(probeRef)
+      def receive: Receive = {
+        case Terminated(probeRef) => context.stop(self)
+        case x => probeRef.forward(x)
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
new file mode 100644
index 0000000..f2f374e
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MasterHarness.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import java.io.File
+import java.net.{InetSocketAddress, Socket, SocketTimeoutException, URLClassLoader, UnknownHostException}
+import java.util.Properties
+import java.util.concurrent.{Executors, TimeUnit}
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext}
+
+import akka.actor.{Actor, ActorSystem, Address, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigValueFactory}
+
+import org.apache.gearpump.cluster.MasterHarness.MockMaster
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorUtil, FileUtils, LogUtil}
+
+trait MasterHarness {
+  private val LOG = LogUtil.getLogger(getClass)
+
+  implicit val pool = MasterHarness.cachedPool
+
+  private var system: ActorSystem = null
+  private var systemAddress: Address = null
+  private var host: String = null
+  private var port: Int = 0
+  private var masterProperties = new Properties()
+  val PROCESS_BOOT_TIME = Duration(25, TimeUnit.SECONDS)
+
+  def getActorSystem: ActorSystem = system
+  def getHost: String = host
+  def getPort: Int = port
+
+  protected def config: Config
+
+  def startActorSystem(): Unit = {
+    val systemConfig = config
+    system = ActorSystem(MASTER, systemConfig)
+    systemAddress = ActorUtil.getSystemAddress(system)
+    host = systemAddress.host.get
+    port = systemAddress.port.get
+
+    masterProperties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$getHost:$getPort")
+    masterProperties.put(s"${GEARPUMP_HOSTNAME}", s"$getHost")
+
+    LOG.info(s"Actor system is started, $host, $port")
+  }
+
+  def shutdownActorSystem(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+    LOG.info(s"Actor system is stopped, $host, $port")
+  }
+
+  def convertTestConf(host: String, port: Int): File = {
+    val test = ConfigFactory.parseResourcesAnySyntax("test.conf",
+      ConfigParseOptions.defaults.setAllowMissing(true))
+
+    val newConf = test.withValue(GEARPUMP_CLUSTER_MASTERS,
+      ConfigValueFactory.fromAnyRef(Array(s"$host:$port").toList.asJava))
+
+    val confFile = File.createTempFile("main", ".conf")
+    val serialized = newConf.root().render()
+    FileUtils.write(confFile, serialized)
+    confFile
+  }
+
+  def createMockMaster(): TestProbe = {
+    val masterReceiver = TestProbe()(system)
+    val master = system.actorOf(Props(classOf[MockMaster], masterReceiver), MASTER)
+    masterReceiver
+  }
+
+  def isPortUsed(host: String, port: Int): Boolean = {
+
+    var isPortUsed = true
+    val socket = new Socket()
+    try {
+      socket.setReuseAddress(true)
+      socket.connect(new InetSocketAddress(host, port), 1000)
+      socket.isConnected
+    } catch {
+      case ex: SocketTimeoutException =>
+        isPortUsed = false
+      case ex: UnknownHostException =>
+        isPortUsed = false
+      case ex: Throwable =>
+        // For other case, we think the port has been occupied.
+        isPortUsed = true
+    } finally {
+      socket.close()
+    }
+    isPortUsed
+  }
+
+  def getContextClassPath: Array[String] = {
+    val contextLoader = Thread.currentThread().getContextClassLoader()
+
+    val urlLoader = if (!contextLoader.isInstanceOf[URLClassLoader]) {
+      contextLoader.getParent.asInstanceOf[URLClassLoader]
+    } else {
+      contextLoader.asInstanceOf[URLClassLoader]
+    }
+
+    val urls = urlLoader.getURLs()
+    val classPath = urls.map { url =>
+      new File(url.getPath()).toString
+    }
+    classPath
+  }
+
+  /**
+   * Remove trailing $
+   */
+  def getMainClassName(mainObj: Any): String = {
+    mainObj.getClass.getName.dropRight(1)
+  }
+
+  def getMasterListOption(): Array[String] = {
+    masterProperties.asScala.toList.map { kv =>
+      s"-D${kv._1}=${kv._2}"
+    }.toArray
+  }
+
+  def masterConfig: Config = {
+    ConfigFactory.parseProperties(masterProperties).withFallback(system.settings.config)
+  }
+}
+
+object MasterHarness {
+
+  val cachedPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
+
+  class MockMaster(receiver: TestProbe) extends Actor {
+    def receive: Receive = {
+      case msg => {
+        receiver.ref forward msg
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala b/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala
new file mode 100644
index 0000000..5d8727e
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/TestUtil.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import akka.actor._
+
+object TestUtil {
+  val DEFAULT_CONFIG = ClusterConfig.default("test.conf")
+  val MASTER_CONFIG = ClusterConfig.master("test.conf")
+  val UI_CONFIG = ClusterConfig.ui("test.conf")
+
+  class DummyAppMaster(context: AppMasterContext, app: AppDescription) extends ApplicationMaster {
+    context.masterProxy !(context, app)
+
+    def receive: Receive = null
+  }
+
+  val dummyApp: AppDescription =
+    AppDescription("dummy", classOf[DummyAppMaster].getName, UserConfig.empty)
+}
\ No newline at end of file