You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:55 UTC

[45/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
index 01d3474..00bd408 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
@@ -18,8 +18,13 @@
 
 package io.gearpump.cluster.appmaster
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor._
 import akka.testkit.TestProbe
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.TestProbeUtil._
 import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
 import io.gearpump.cluster._
@@ -27,27 +32,28 @@ import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
 import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironmentSpec.TestAppMasterEnv
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.StartExecutorSystems
 import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 
-class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll  {
+class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   implicit var system: ActorSystem = null
 
-  override def beforeAll() = {
+  override def beforeAll(): Unit = {
     system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
   }
 
-  override def afterAll() = {
-    system.shutdown()
-    system.awaitTermination()
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
-  "MasterWithExecutorSystemProvider" should "forward request StartExecutorSystem to ExecutorSystemProvider" in {
+  "MasterWithExecutorSystemProvider" should
+    "forward request StartExecutorSystem to ExecutorSystemProvider" in {
 
     val client = TestProbe()
     val master = TestProbe()
     val provider = TestProbe()
     val providerProps: Props = provider
-    val masterEnhanced = system.actorOf(Props(new MasterWithExecutorSystemProvider(master.ref, providerProps)))
+    val masterEnhanced = system.actorOf(Props(
+      new MasterWithExecutorSystemProvider(master.ref, providerProps)))
 
     val start = StartExecutorSystems(null, null)
     client.send(masterEnhanced, start)
@@ -76,7 +82,8 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before
   }
 
   "AppMasterRuntimeEnvironment" should "start appMaster when master is connected" in {
-    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = setupAppMasterRuntimeEnv
+    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
+      setupAppMasterRuntimeEnv()
 
     masterConnectionKeeper.send(runtimeEnv, MasterConnected)
     appMaster.expectMsg(StartAppMaster)
@@ -84,7 +91,8 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before
 
   "AppMasterRuntimeEnvironment" should "shutdown itself when master is stopped" in {
 
-    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = setupAppMasterRuntimeEnv
+    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
+      setupAppMasterRuntimeEnv()
 
     masterConnectionKeeper.send(runtimeEnv, MasterStopped)
     val client = TestProbe()
@@ -94,7 +102,8 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before
 
   "AppMasterRuntimeEnvironment" should "shutdown itself when appMaster is stopped" in {
 
-    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) = setupAppMasterRuntimeEnv
+    val TestAppMasterEnv(master, appMaster, masterConnectionKeeper, runtimeEnv) =
+      setupAppMasterRuntimeEnv()
 
     val client = TestProbe()
     client.watch(runtimeEnv)
@@ -102,13 +111,13 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before
     client.expectTerminated(runtimeEnv)
   }
 
-  private def setupAppMasterRuntimeEnv: TestAppMasterEnv = {
-    val appContext = AppMasterContext(0, null, null, null,  null, null, null)
+  private def setupAppMasterRuntimeEnv(): TestAppMasterEnv = {
+    val appContext = AppMasterContext(0, null, null, null, null, null, null)
     val app = AppDescription("app", "AppMasterClass", null, null)
     val master = TestProbe()
     val masterFactory = (_: AppId, _: MasterActorRef) => toProps(master)
     val appMaster = TestProbe()
-    val appMasterFactory = (_: AppMasterContext, _: AppDescription)=> toProps(appMaster)
+    val appMasterFactory = (_: AppMasterContext, _: AppDescription) => toProps(appMaster)
     val masterConnectionKeeper = TestProbe()
     val masterConnectionKeeperFactory =
       (_: MasterActorRef, _: RegisterAppMaster, _: ListenerActorRef) =>
@@ -125,5 +134,7 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with Matchers with Before
 
 object AppMasterRuntimeEnvironmentSpec {
 
-  case class TestAppMasterEnv(master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe, appMasterRuntimeEnv: ActorRef)
+  case class TestAppMasterEnv(
+      master: TestProbe, appMaster: TestProbe, connectionkeeper: TestProbe,
+      appMasterRuntimeEnv: ActorRef)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
index dc8e624..3595960 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala
@@ -18,24 +18,25 @@
 
 package io.gearpump.cluster.appmaster
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
 import com.typesafe.config.ConfigValueFactory
-import io.gearpump.WorkerId
-import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import io.gearpump.cluster.TestUtil
 import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
 import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.Session
 import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
 import io.gearpump.util.Constants
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import scala.concurrent.duration._
 
-class ExecutorSystemLauncherSpec  extends FlatSpec with Matchers with BeforeAndAfterAll {
+class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   implicit var system: ActorSystem = null
   val workerId: WorkerId = WorkerId(0, 0L)
   val appId = 0
@@ -45,15 +46,15 @@ class ExecutorSystemLauncherSpec  extends FlatSpec with Matchers with BeforeAndA
   val launchExecutorSystemTimeout = 3000
   val activeConfig = TestUtil.DEFAULT_CONFIG.
     withValue(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS,
-              ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout))
+      ConfigValueFactory.fromAnyRef(launchExecutorSystemTimeout))
 
-  override def beforeAll() = {
+  override def beforeAll(): Unit = {
     system = ActorSystem("test", activeConfig)
   }
 
-  override def afterAll() = {
-    system.shutdown()
-    system.awaitTermination()
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   it should "report success when worker launch the system successfully" in {
@@ -90,14 +91,15 @@ class ExecutorSystemLauncherSpec  extends FlatSpec with Matchers with BeforeAndA
     client.expectTerminated(launcher)
   }
 
-  it should "report timeout when trying to start a executor system on worker, and worker doesn't response" in {
+  it should "report timeout when trying to start a executor system on worker, " +
+    "and worker doesn't response" in {
     val client = TestProbe()
     val worker = TestProbe()
     val launcher = system.actorOf(Props(new ExecutorSystemLauncher(appId, session)))
     client.send(launcher, LaunchExecutorSystem(WorkerInfo(workerId, worker.ref), 0, Resource(1)))
     client.watch(launcher)
     val waitFor = launchExecutorSystemTimeout + 10000
-    client.expectMsgType[LaunchExecutorSystemTimeout](waitFor milliseconds)
-    client.expectTerminated(launcher, waitFor milliseconds)
+    client.expectMsgType[LaunchExecutorSystemTimeout](waitFor.milliseconds)
+    client.expectTerminated(launcher, waitFor.milliseconds)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
index 106d389..c2f6ee8 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala
@@ -18,20 +18,22 @@
 
 package io.gearpump.cluster.appmaster
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.testkit.TestProbe
-import io.gearpump.WorkerId
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.{ExecutorJVMConfig, AppJar, TestUtil}
 import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._
 import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._
 import io.gearpump.cluster.appmaster.ExecutorSystemSchedulerSpec.{ExecutorSystemLauncherStarted, MockExecutorSystemLauncher}
 import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{AppJar, TestUtil}
 import io.gearpump.jarstore.FilePath
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import scala.concurrent.duration._
 
 class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
   val appId = 0
@@ -43,7 +45,7 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA
   val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig)
 
   implicit var system: ActorSystem = null
-  var worker:TestProbe = null
+  var worker: TestProbe = null
   var workerInfo: WorkerInfo = null
   var masterProxy: TestProbe = null
   var launcher: TestProbe = null
@@ -58,20 +60,21 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA
     client = TestProbe()
 
     val scheduler = system.actorOf(
-      Props(new ExecutorSystemScheduler(appId, masterProxy.ref,
-        (appId: Int, session:Session) => Props(new MockExecutorSystemLauncher(launcher, session)))))
+      Props(new ExecutorSystemScheduler(appId, masterProxy.ref, (appId: Int, session: Session) => {
+        Props(new MockExecutorSystemLauncher(launcher, session))
+      })))
 
     client.send(scheduler, start)
     masterProxy.expectMsg(RequestResource(appId, resourceRequest))
   }
 
   override def afterEach(): Unit = {
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   private def launcherStarted(launcher: TestProbe): Option[ExecutorSystemLauncherStarted] = {
-    val launcherStarted = launcher.receiveOne(15 seconds)
+    val launcherStarted = launcher.receiveOne(15.seconds)
 
     launcherStarted match {
       case start: ExecutorSystemLauncherStarted => Some(start)
@@ -91,16 +94,18 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA
     launcher.expectMsg(LaunchExecutorSystem(workerInfo, systemId, resource))
 
     val executorSystemProbe = TestProbe()
-    val executorSystem = ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo)
+    val executorSystem =
+      ExecutorSystem(systemId, null, executorSystemProbe.ref, resource, workerInfo)
     launcher.reply(LaunchExecutorSystemSuccess(executorSystem, session))
     client.expectMsg(ExecutorSystemStarted(executorSystem, Some(mockJar)))
   }
 
   it should "report failure when resource cannot be allocated" in {
-    client.expectMsg(30 seconds, StartExecutorSystemTimeout)
+    client.expectMsg(30.seconds, StartExecutorSystemTimeout)
   }
 
-  it should "schedule new resouce on new worker when target worker reject creating executor system" in {
+  it should "schedule new resouce on new worker " +
+    "when target worker reject creating executor system" in {
     masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
     val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
 
@@ -110,7 +115,8 @@ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndA
     masterProxy.expectMsg(RequestResource(appId, resourceRequest))
   }
 
-  it should "report failure when resource is allocated, but timeout when starting the executor system" in {
+  it should "report failure when resource is allocated, but timeout " +
+    "when starting the executor system" in {
     masterProxy.reply(ResourceAllocated(Array(ResourceAllocation(resource, worker.ref, workerId))))
     val ExecutorSystemLauncherStarted(session) = launcherStarted(launcher).get
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
index 9221ca3..3272b99 100644
--- a/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
@@ -18,51 +18,50 @@
 
 package io.gearpump.cluster.appmaster
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.testkit.TestProbe
-import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.MasterConnected
-import io.gearpump.cluster.master.MasterProxy.WatchMaster
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
 import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
 import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._
+import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, _}
 import io.gearpump.cluster.appmaster.MasterConnectionKeeperSpec.ConnectionKeeperTestEnv
-import io.gearpump.cluster.master.MasterProxy
 import io.gearpump.cluster.master.MasterProxy.WatchMaster
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 
-import scala.concurrent.duration._
-
-class MasterConnectionKeeperSpec  extends FlatSpec with Matchers with BeforeAndAfterAll {
+class MasterConnectionKeeperSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
 
   implicit var system: ActorSystem = null
   val register = RegisterAppMaster(null, null)
   val appId = 0
 
-   override def beforeAll: Unit = {
-     system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-   }
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
 
-   override def afterAll: Unit = {
-    system.shutdown()
-     system.awaitTermination()
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   private def startMasterConnectionKeeper: ConnectionKeeperTestEnv = {
     val statusChangeSubscriber = TestProbe()
     val master = TestProbe()
 
-    val keeper = system.actorOf(Props(new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref)))
+    val keeper = system.actorOf(Props(
+      new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref)))
     statusChangeSubscriber.watch(keeper)
 
     master.expectMsgType[WatchMaster]
 
-    //master is alive, response to RegisterAppMaster
+    // Master is alive, response to RegisterAppMaster
     master.expectMsgType[RegisterAppMaster]
     master.reply(AppMasterRegistered(appId))
 
-    //notify listener that master is alive
+    // Notify listener that master is alive
     statusChangeSubscriber.expectMsg(MasterConnected)
     ConnectionKeeperTestEnv(master, keeper, statusChangeSubscriber)
   }
@@ -75,23 +74,23 @@ class MasterConnectionKeeperSpec  extends FlatSpec with Matchers with BeforeAndA
     import io.gearpump.cluster.master.MasterProxy.MasterRestarted
     val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper
 
-    //master is restarted
+    // Master is restarted
     master.send(keeper, MasterRestarted)
     master.expectMsgType[RegisterAppMaster]
     master.reply(AppMasterRegistered(appId))
     masterChangeListener.expectMsg(MasterConnected)
 
-    //Recovery from Master restart is transparent to listener
+    // Recovery from Master restart is transparent to listener
     masterChangeListener.expectNoMsg()
   }
 
   it should "notify listener and then shutdown itself when master is dead" in {
     val ConnectionKeeperTestEnv(master, keeper, masterChangeListener) = startMasterConnectionKeeper
 
-    //master is dead
+    // Master is dead
     master.send(keeper, MasterStopped)
 
-    //keeper should tell the listener that master is stopped before shutting down itself
+    // Keeper should tell the listener that master is stopped before shutting down itself
     masterChangeListener.expectMsg(MasterStopped)
     masterChangeListener.expectTerminated(keeper)
   }
@@ -100,18 +99,20 @@ class MasterConnectionKeeperSpec  extends FlatSpec with Matchers with BeforeAndA
     val statusChangeSubscriber = TestProbe()
     val master = TestProbe()
 
-    //keeper will register to master by sending RegisterAppMaster
-    val keeper = system.actorOf(Props(new MasterConnectionKeeper(register, master.ref, statusChangeSubscriber.ref)))
+    // MasterConnectionKeeper register itself to master by sending RegisterAppMaster
+    val keeper = system.actorOf(Props(new MasterConnectionKeeper(register,
+      master.ref, statusChangeSubscriber.ref)))
 
-    //master doesn't reply to keeper,
+    // Master doesn't reply to keeper,
     statusChangeSubscriber.watch(keeper)
 
-    //timeout, keeper notify listener, and then make suicide
-    statusChangeSubscriber.expectMsg(60 seconds, MasterStopped)
-    statusChangeSubscriber.expectTerminated(keeper, 60 seconds)
+    // Timeout, keeper notify listener, and then make suicide
+    statusChangeSubscriber.expectMsg(60.seconds, MasterStopped)
+    statusChangeSubscriber.expectTerminated(keeper, 60.seconds)
   }
 }
 
 object MasterConnectionKeeperSpec {
-  case class ConnectionKeeperTestEnv(master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe)
+  case class ConnectionKeeperTestEnv(
+      master: TestProbe, keeper: ActorRef, masterChangeListener: TestProbe)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala b/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
index f08e368..7544f9a 100644
--- a/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/main/ArgumentParserSpec.scala
@@ -30,7 +30,7 @@ class ArgumentParserSpec extends FlatSpec with Matchers {
         "opt2" -> CLIOption[Any]("", required = true))
     }
 
-    val result = parser.parse(Array("-flag" , "-opt1", "1","-opt2", "2", "arg1", "arg2"))
+    val result = parser.parse(Array("-flag", "-opt1", "1", "-opt2", "2", "arg1", "arg2"))
     assert(result.getBoolean("flag"))
     assert(result.getInt("opt1") == 1)
     assert(result.getString("opt1") == "1")
@@ -47,7 +47,7 @@ class ArgumentParserSpec extends FlatSpec with Matchers {
         "opt1" -> CLIOption[Any]("", required = true))
     }
 
-    val result = parser.parse(Array("-opt1", "1","xx.MainClass", "-opt2", "2"))
+    val result = parser.parse(Array("-opt1", "1", "xx.MainClass", "-opt2", "2"))
     assert(result.getInt("opt1") == 1)
 
     assert(result.remainArgs.length == 3)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
index 8ff2ea1..2a8dba1 100644
--- a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,24 +18,27 @@
 
 package io.gearpump.cluster.master
 
+import scala.util.Success
+
 import akka.actor._
 import akka.testkit.TestProbe
-import io.gearpump.WorkerId
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
-import io.gearpump.cluster.AppMasterToWorker.{ShutdownExecutor, LaunchExecutor}
+import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
+import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import io.gearpump.cluster.{TestUtil, MasterHarness}
-import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import io.gearpump.cluster.scheduler.{ResourceRequest, Resource, ResourceAllocation}
+import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.cluster.{MasterHarness, TestUtil}
 import io.gearpump.util.ActorSystemBooter._
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import scala.util.Success
 
-class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+class AppMasterLauncherSpec extends FlatSpec with Matchers
+  with BeforeAndAfterEach with MasterHarness {
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  override def config: Config = TestUtil.DEFAULT_CONFIG
 
   val appId = 1
   val executorId = 2
@@ -45,7 +48,7 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa
   var watcher: TestProbe = null
   var appMasterLauncher: ActorRef = null
 
-  override def beforeEach() = {
+  override def beforeEach(): Unit = {
     startActorSystem()
     master = createMockMaster()
     client = TestProbe()(getActorSystem)
@@ -55,12 +58,13 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa
       TestUtil.dummyApp, None, "username", master.ref, Some(client.ref)))
     watcher watch appMasterLauncher
     master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
-    val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
+    val resource = ResourceAllocated(
+      Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
     master.reply(resource)
     worker.expectMsgType[LaunchExecutor]
   }
 
-  override def afterEach() = {
+  override def afterEach(): Unit = {
     shutdownActorSystem()
   }
 
@@ -79,7 +83,8 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa
     worker.reply(ExecutorLaunchRejected(""))
     master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)))
 
-    val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
+    val resource = ResourceAllocated(
+      Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L))))
     master.reply(resource)
     worker.expectMsgType[LaunchExecutor]
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
index c1d6144..99cfc37 100644
--- a/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/master/ApplicationStateSpec.scala
@@ -18,11 +18,11 @@
 
 package io.gearpump.cluster.master
 
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
 import io.gearpump.cluster.appmaster.ApplicationState
-import io.gearpump.cluster.{AppJar, AppDescription}
-import org.scalatest.{BeforeAndAfterEach, Matchers, FlatSpec}
 
-class ApplicationStateSpec  extends FlatSpec with Matchers with BeforeAndAfterEach  {
+class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
 
   "ApplicationState" should "check equal with respect to only appId and attemptId" in {
     val stateA = ApplicationState(0, "application0", 0, null, null, null, "A")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
index 690d8ad..b007120 100644
--- a/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/master/MasterProxySpec.scala
@@ -18,8 +18,7 @@
 
 package io.gearpump.cluster.master
 
-//TODO
 class MasterProxySpec {
 
-  // master proxy will retry to find the master
+  // Master proxy retries multiple times to find the master
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
index 972f9d5..e4122da 100644
--- a/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
+++ b/core/src/test/scala/io/gearpump/cluster/master/MasterSpec.scala
@@ -19,5 +19,4 @@
 package io.gearpump.cluster.master
 
 class MasterSpec {
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala b/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
index 1602d10..3b3265f 100644
--- a/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
+++ b/core/src/test/scala/io/gearpump/metrics/MetricsSpec.scala
@@ -18,13 +18,14 @@
 
 package io.gearpump.metrics
 
-import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
-import org.scalatest.{FlatSpec, Matchers}
 import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
 
-class MetricsSpec  extends FlatSpec with Matchers with MockitoSugar{
+import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter, Histogram => CodaHaleHistogram, Meter => CodaHaleMeter}
+
+class MetricsSpec extends FlatSpec with Matchers with MockitoSugar {
 
   "Counter" should "handle sampleRate == 1" in {
 
@@ -83,7 +84,6 @@ class MetricsSpec  extends FlatSpec with Matchers with MockitoSugar{
     verify(mockBaseHistogram, times(1)).update(6L)
   }
 
-
   "Meter" should "handle sampleRate == 1" in {
 
     val mockBaseMeter = mock[CodaHaleMeter]
@@ -101,7 +101,7 @@ class MetricsSpec  extends FlatSpec with Matchers with MockitoSugar{
 
     val mockBaseMeter = mock[CodaHaleMeter]
 
-    val meter = new Meter("m",mockBaseMeter, 2)
+    val meter = new Meter("m", mockBaseMeter, 2)
 
     meter.mark(1)
     meter.mark(3)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala b/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
index d5a3c18..9509d94 100644
--- a/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
+++ b/core/src/test/scala/io/gearpump/partitioner/PartitionerSpec.scala
@@ -18,11 +18,12 @@
 
 package io.gearpump.partitioner
 
-import io.gearpump.Message
 import org.scalatest.{FlatSpec, Matchers}
 
-class PartitionerSpec  extends FlatSpec with Matchers {
-   val NUM = 10
+import io.gearpump.Message
+
+class PartitionerSpec extends FlatSpec with Matchers {
+  val NUM = 10
 
   "HashPartitioner" should "hash same key to same slots" in {
     val partitioner = new HashPartitioner

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala b/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
index c64cfa7..c38a555 100644
--- a/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
+++ b/core/src/test/scala/io/gearpump/security/ConfigFileBasedAuthenticatorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,20 @@
 
 package io.gearpump.security
 
-import akka.actor.ActorSystem
-import io.gearpump.cluster.TestUtil
-import io.gearpump.security.Authenticator.AuthenticationResult
-import org.scalatest.{Matchers, FlatSpec}
-
 import scala.concurrent.Await
 import scala.concurrent.duration._
 
-class ConfigFileBasedAuthenticatorSpec  extends FlatSpec with Matchers {
+import akka.actor.ActorSystem
+import org.scalatest.{FlatSpec, Matchers}
+
+import io.gearpump.cluster.TestUtil
+
+class ConfigFileBasedAuthenticatorSpec extends FlatSpec with Matchers {
   it should "authenticate correctly" in {
     val config = TestUtil.UI_CONFIG
     implicit val system = ActorSystem("ConfigFileBasedAuthenticatorSpec", config)
     implicit val ec = system.dispatcher
-    val timeout = 30 seconds
+    val timeout = 30.seconds
 
     val authenticator = new ConfigFileBasedAuthenticator(config)
     val guest = Await.result(authenticator.authenticate("guest", "guest", ec), timeout)
@@ -48,7 +48,7 @@ class ConfigFileBasedAuthenticatorSpec  extends FlatSpec with Matchers {
     assert(failedGuest == Authenticator.UnAuthenticated)
     assert(failedAdmin == Authenticator.UnAuthenticated)
 
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala b/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
index 69abb80..4a3963b 100644
--- a/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
+++ b/core/src/test/scala/io/gearpump/security/PasswordUtilSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,7 @@
 
 package io.gearpump.security
 
-import org.scalatest.{Matchers, FlatSpec}
+import org.scalatest.{FlatSpec, Matchers}
 
 class PasswordUtilSpec extends FlatSpec with Matchers {
 
@@ -28,11 +28,10 @@ class PasswordUtilSpec extends FlatSpec with Matchers {
     val digest1 = PasswordUtil.hash(password)
     val digest2 = PasswordUtil.hash(password)
 
-    // we will use different salt each time, thus
-    // creating different hash.
+    // Uses different salt each time, thus creating different hash.
     assert(digest1 != digest2)
 
-    // both are valid hash.
+    // Both are valid hash.
     assert(PasswordUtil.verify(password, digest1))
     assert(PasswordUtil.verify(password, digest2))
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala b/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
index c18f4d5..3ed6ffa 100644
--- a/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
+++ b/core/src/test/scala/io/gearpump/serializer/SerializerSpec.scala
@@ -18,16 +18,19 @@
 
 package io.gearpump.serializer
 
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{ActorSystem, ExtendedActorSystem}
-import io.gearpump.esotericsoftware.kryo.io.{Input, Output}
-import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
 import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.serializer.SerializerSpec._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
 
-import scala.collection.JavaConverters._
+import io.gearpump.cluster.TestUtil
+import io.gearpump.esotericsoftware.kryo.io.{Input, Output}
+import io.gearpump.esotericsoftware.kryo.{Kryo, Serializer => KryoSerializer}
+import io.gearpump.serializer.SerializerSpec._
 
 class SerializerSpec extends FlatSpec with Matchers with MockitoSugar {
   val config = ConfigFactory.empty.withValue("gearpump.serializers",
@@ -56,8 +59,8 @@ class SerializerSpec extends FlatSpec with Matchers with MockitoSugar {
     val anotherA = serializer.deserialize(bytes)
 
     assert(anotherA.isInstanceOf[ClassA])
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala b/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
index 8582f29..71b4218 100644
--- a/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
+++ b/core/src/test/scala/io/gearpump/transport/MockTransportSerializer.scala
@@ -15,15 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.transport
 
 import java.io.{DataInput, DataOutput}
 
 import io.gearpump.transport.MockTransportSerializer.NettyMessage
 import io.gearpump.transport.netty.ITransportMessageSerializer
-import org.jboss.netty.buffer.ChannelBuffer
 
-class MockTransportSerializer extends ITransportMessageSerializer{
+class MockTransportSerializer extends ITransportMessageSerializer {
   override def getLength(obj: scala.Any): Int = 4
 
   override def serialize(dataOutput: DataOutput, transportMessage: scala.Any): Unit = {
@@ -38,6 +38,6 @@ class MockTransportSerializer extends ITransportMessageSerializer{
   }
 }
 
-object MockTransportSerializer{
-  case class NettyMessage(num : Int)
+object MockTransportSerializer {
+  case class NettyMessage(num: Int)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/transport/NettySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/transport/NettySpec.scala b/core/src/test/scala/io/gearpump/transport/NettySpec.scala
index a81e21d..6caf357 100644
--- a/core/src/test/scala/io/gearpump/transport/NettySpec.scala
+++ b/core/src/test/scala/io/gearpump/transport/NettySpec.scala
@@ -19,18 +19,19 @@
 package io.gearpump.transport
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.Await
+import scala.concurrent.duration._
 
 import akka.actor.{ActorRef, ActorSystem}
 import akka.testkit.TestProbe
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
 import io.gearpump.cluster.TestUtil
 import io.gearpump.transport.MockTransportSerializer.NettyMessage
 import io.gearpump.transport.netty.{Context, TaskMessage}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
 import io.gearpump.util.Util
 
-import scala.concurrent.duration._
-
 class NettySpec extends FlatSpec with Matchers with MockitoSugar {
 
   "Netty Transport" should "send and receive message correctly " in {
@@ -39,7 +40,7 @@ class NettySpec extends FlatSpec with Matchers with MockitoSugar {
     val context = new Context(system, conf)
     val serverActor = TestProbe()(system)
 
-    val port = Util.findFreePort
+    val port = Util.findFreePort()
 
     import system.dispatcher
     system.scheduler.scheduleOnce(Duration(1, TimeUnit.SECONDS)) {
@@ -52,10 +53,10 @@ class NettySpec extends FlatSpec with Matchers with MockitoSugar {
     val data = NettyMessage(0)
     val msg = new TaskMessage(0, 1, 2, data)
     client ! msg
-    serverActor.expectMsg(15 seconds, data)
-    
-    context.close
-    system.shutdown()
-    system.awaitTermination()
+    serverActor.expectMsg(15.seconds, data)
+
+    context.close()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala b/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
index d5068fb..13530ed 100644
--- a/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/ActorSystemBooterSpec.scala
@@ -18,22 +18,24 @@
 
 package io.gearpump.util
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.testkit.TestProbe
-import io.gearpump.cluster.TestUtil
-import io.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.util.ActorSystemBooter._
-import io.gearpump.util.ActorSystemBooterSpec._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
 
+import io.gearpump.cluster.TestUtil
+import io.gearpump.util.ActorSystemBooter.{ActorCreated, RegisterActorSystem, _}
+import io.gearpump.util.ActorSystemBooterSpec._
+
 class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
 
   "ActorSystemBooter" should "report its address back" in {
     val boot = bootSystem()
     boot.prob.expectMsgType[RegisterActorSystem]
-    boot.shutdown
+    boot.shutdown()
   }
 
   "ActorSystemBooter" should "terminate itself when parent actor dies" in {
@@ -44,9 +46,9 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
     boot.prob.reply(ActorSystemRegistered(boot.prob.ref))
     boot.prob.reply(BindLifeCycle(dummy))
     boot.host.stop(dummy)
-    val terminated = retry(5)(boot.bootedSystem.isTerminated)
+    val terminated = retry(5)(boot.bootedSystem.whenTerminated.isCompleted)
     assert(terminated)
-    boot.shutdown
+    boot.shutdown()
   }
 
   "ActorSystemBooter" should "create new actor" in {
@@ -59,10 +61,10 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
     boot.prob.reply(CreateActor(Props(classOf[AcceptZeroArguments]), "zero"))
     boot.prob.expectMsgType[ActorCreated]
 
-    boot.shutdown
+    boot.shutdown()
   }
 
-  private def bootSystem() : Boot = {
+  private def bootSystem(): Boot = {
     val booter = ActorSystemBooter(TestUtil.DEFAULT_CONFIG)
 
     val system = ActorSystem("reportback", TestUtil.DEFAULT_CONFIG)
@@ -75,16 +77,16 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
     Boot(system, receiver, bootSystem)
   }
 
-  case class Boot(host : ActorSystem, prob : TestProbe, bootedSystem : ActorSystem) {
-    def shutdown = {
-      host.shutdown()
-      bootedSystem.shutdown()
-      host.awaitTermination()
-      bootedSystem.awaitTermination()
+  case class Boot(host: ActorSystem, prob: TestProbe, bootedSystem: ActorSystem) {
+    def shutdown(): Unit = {
+      host.terminate()
+      bootedSystem.terminate()
+      Await.result(host.whenTerminated, Duration.Inf)
+      Await.result(bootedSystem.whenTerminated, Duration.Inf)
     }
   }
 
-  def retry(seconds: Int)(fn: => Boolean) : Boolean = {
+  def retry(seconds: Int)(fn: => Boolean): Boolean = {
     val result = fn
     if (result) {
       result
@@ -97,19 +99,19 @@ class ActorSystemBooterSpec extends FlatSpec with Matchers with MockitoSugar {
 
 object ActorSystemBooterSpec {
   class Dummy extends Actor {
-    def receive : Receive = {
+    def receive: Receive = {
       case _ =>
     }
   }
 
   class AcceptZeroArguments extends Actor {
-    def receive : Receive = {
+    def receive: Receive = {
       case _ =>
     }
   }
 
-  class AcceptThreeArguments(a : Int, b : Int, c : Int) extends Actor {
-    def receive : Receive = {
+  class AcceptThreeArguments(a: Int, b: Int, c: Int) extends Actor {
+    def receive: Receive = {
       case _ =>
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala b/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
index ec72810..6ab5a2f 100644
--- a/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/ActorUtilSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,23 +18,23 @@
 
 package io.gearpump.util
 
+import org.scalatest.FlatSpec
+
 import io.gearpump.transport.HostPort
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, FlatSpec}
 
-class ActorUtilSpec  extends FlatSpec {
- "masterActorPath" should "construct the ActorPath from HostPort" in {
-   import Constants.MASTER
+class ActorUtilSpec extends FlatSpec {
+  "masterActorPath" should "construct the ActorPath from HostPort" in {
+    import io.gearpump.util.Constants.MASTER
 
-   val host = "127.0.0.1"
-   val port = 3000
-   val master = HostPort("127.0.0.1", 3000)
-   val masterPath = ActorUtil.getMasterActorPath(master)
-   assert(masterPath.address.port == Some(port))
-   assert(masterPath.address.system == MASTER)
-   assert(masterPath.address.host == Some(host))
-   assert(masterPath.address.protocol == "akka.tcp")
-   assert(masterPath.toStringWithoutAddress == s"/user/$MASTER")
- }
+    val host = "127.0.0.1"
+    val port = 3000
+    val master = HostPort("127.0.0.1", 3000)
+    val masterPath = ActorUtil.getMasterActorPath(master)
+    assert(masterPath.address.port == Some(port))
+    assert(masterPath.address.system == MASTER)
+    assert(masterPath.address.host == Some(host))
+    assert(masterPath.address.protocol == "akka.tcp")
+    assert(masterPath.toStringWithoutAddress == s"/user/$MASTER")
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala b/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
index 07e8121..0c798f3 100644
--- a/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/ConfigsSpec.scala
@@ -19,16 +19,20 @@
 package io.gearpump.util
 
 import java.io.File
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
 
 import akka.actor.ActorSystem
-import io.gearpump.cluster.{ClusterConfigSource, ClusterConfig, UserConfig}
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
 
-class ConfigsSpec  extends FlatSpec with Matchers with MockitoSugar {
+import io.gearpump.cluster.{ClusterConfig, ClusterConfigSource, UserConfig}
+
+class ConfigsSpec extends FlatSpec with Matchers with MockitoSugar {
   "Typesafe Cluster Configs" should "follow the override rules" in {
 
-    val conf =  """
+    val conf =
+      """
       gearpump {
         gear = "gearpump"
       }
@@ -40,7 +44,7 @@ class ConfigsSpec  extends FlatSpec with Matchers with MockitoSugar {
         conf = "worker"
       }
       conf = "base"
-  """
+      """
 
     val file = File.createTempFile("test", ".conf")
     FileUtils.write(file, conf)
@@ -68,8 +72,7 @@ class ConfigsSpec  extends FlatSpec with Matchers with MockitoSugar {
 
     implicit val system = ActorSystem("forSerialization")
 
-
-    val map = Map[String,String]("key1"->"1", "key2"->"value2")
+    val map = Map[String, String]("key1" -> "1", "key2" -> "value2")
 
     val user = new UserConfig(map)
       .withLong("key3", 2L)
@@ -86,11 +89,11 @@ class ConfigsSpec  extends FlatSpec with Matchers with MockitoSugar {
 
     val data = new ConfigsSpec.Data(3)
     assert(data == user.withValue("data", data).getValue[ConfigsSpec.Data]("data").get)
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }
 
-object ConfigsSpec{
+object ConfigsSpec {
   case class Data(value: Int)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala b/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
index 1c954d5..30e42c7 100644
--- a/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/FileUtilsSpec.scala
@@ -16,12 +16,13 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.util
 
 import java.io.File
 import java.util
+
 import org.scalatest.FlatSpec
+
 import io.gearpump.google.common.io.Files
 
 class FileUtilsSpec extends FlatSpec {
@@ -43,7 +44,7 @@ class FileUtilsSpec extends FlatSpec {
     val file = File.createTempFile("fileutilspec", ".test")
     val bytes = TXT.toCharArray.map(_.toByte)
     FileUtils.writeByteArrayToFile(file, bytes)
-    util.Arrays.equals(bytes,FileUtils.readFileToByteArray(file))
+    util.Arrays.equals(bytes, FileUtils.readFileToByteArray(file))
     file.delete()
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/GraphSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/GraphSpec.scala b/core/src/test/scala/io/gearpump/util/GraphSpec.scala
index 0028446..20eab6d 100644
--- a/core/src/test/scala/io/gearpump/util/GraphSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/GraphSpec.scala
@@ -20,9 +20,9 @@ package io.gearpump.util
 
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
-import org.scalatest.{PropSpec, Matchers}
+import org.scalatest.{Matchers, PropSpec}
 
-import io.gearpump.util.Graph.{Path, Node}
+import io.gearpump.util.Graph.{Node, Path}
 
 class GraphSpec extends PropSpec with PropertyChecks with Matchers {
 
@@ -33,7 +33,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
 
   property("Graph with no edges should be built correctly") {
     val vertexSet = Set("A", "B", "C")
-    val graph = Graph(vertexSet.toSeq.map(Node):_ *)
+    val graph = Graph(vertexSet.toSeq.map(Node): _*)
     graph.vertices.toSet shouldBe vertexSet
   }
 
@@ -107,7 +107,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
 
     val levelMap = graph.vertexHierarchyLevelMap()
 
-    //check whether the rule holds: : if vertex A -> B, then level(A) < level(B)
+    // Check whether the rule holds: : if vertex A -> B, then level(A) < level(B)
     levelMap("A") < levelMap("B")
     levelMap("A") < levelMap("C")
     levelMap("B") < levelMap("C")
@@ -178,8 +178,8 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
     assert(graph.hasCycle())
   }
 
-  property("topologicalOrderIterator " +
-    "and topologicalOrderWithCirclesIterator method should return equal order of graph with no circle") {
+  property("topologicalOrderIterator and topologicalOrderWithCirclesIterator method should " +
+    "return equal order of graph with no circle") {
     val graph = Graph(1 ~> 2 ~> 3, 4 ~> 2, 2 ~> 5)
     val topoNoCircles = graph.topologicalOrderIterator
     val topoWithCircles = graph.topologicalOrderWithCirclesIterator
@@ -188,9 +188,10 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers {
   }
 
   property("Topological sort of graph with circles should work properly") {
-    val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7, 4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10)
+    val graph = Graph(0 ~> 1 ~> 3 ~> 4 ~> 6 ~> 5 ~> 7,
+      4 ~> 1, 1 ~> 2 ~> 4, 7 ~> 6, 8 ~> 2, 6 ~> 9, 4 ~> 10)
     val topoWithCircles = graph.topologicalOrderWithCirclesIterator
-    val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6 ,5, 7, 10, 9)
+    val trueTopoWithCircles = Iterator[Int](0, 8, 1, 3, 4, 2, 6, 5, 7, 10, 9)
 
     assert(trueTopoWithCircles.zip(topoWithCircles).forall(x => x._1 == x._2))
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala b/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
index 213bd96..ef362ff 100644
--- a/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/TimeOutSchedulerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,15 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.util
 
+import scala.concurrent.duration._
+
 import akka.actor._
 import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.TestUtil
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 import org.slf4j.Logger
-import scala.concurrent.duration._
+
+import io.gearpump.cluster.TestUtil
 
 class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
   with WordSpecLike with Matchers with BeforeAndAfterAll {
@@ -40,7 +42,7 @@ class TimeOutSchedulerSpec(_system: ActorSystem) extends TestKit(_system) with I
       val testActorRef = TestActorRef(Props(classOf[TestActor], mockActor.ref))
       val testActor = testActorRef.underlyingActor.asInstanceOf[TestActor]
       testActor.sendMsgToIgnore()
-      mockActor.expectMsg(30 seconds, MessageTimeOut)
+      mockActor.expectMsg(30.seconds, MessageTimeOut)
     }
   }
 }
@@ -60,7 +62,7 @@ class TestActor(mock: ActorRef) extends Actor with TimeOutScheduler {
   def sendMsgToIgnore(): Unit = {
     sendMsgWithTimeOutCallBack(target, Echo, 2000, sendMsgTimeOut())
   }
-  
+
   private def sendMsgTimeOut(): Unit = {
     mock ! MessageTimeOut
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/util/UtilSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/util/UtilSpec.scala b/core/src/test/scala/io/gearpump/util/UtilSpec.scala
index 8d614fe..b5bde04 100644
--- a/core/src/test/scala/io/gearpump/util/UtilSpec.scala
+++ b/core/src/test/scala/io/gearpump/util/UtilSpec.scala
@@ -18,17 +18,18 @@
 
 package io.gearpump.util
 
-import io.gearpump.transport.HostPort
-import io.gearpump.util.Util._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
 
+import io.gearpump.transport.HostPort
+import io.gearpump.util.Util._
+
 class UtilSpec extends FlatSpec with Matchers with MockitoSugar {
   it should "work" in {
 
-    assert(findFreePort.isSuccess)
+    assert(findFreePort().isSuccess)
 
-    assert(randInt != randInt)
+    assert(randInt() != randInt())
 
     val hosts = parseHostList("host1:1,host2:2")
     assert(hosts(1) == HostPort("host2", 2))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git a/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService b/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
index f0e0c5c..d226af9 100644
--- a/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
+++ b/daemon/src/main/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
@@ -1,2 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 io.gearpump.jarstore.local.LocalJarStoreService
 io.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
index ac942ed..420f1b6 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,21 +18,34 @@
 package io.gearpump.cluster
 
 import akka.actor.ActorRef
-import io.gearpump.WorkerId
+
 import io.gearpump.cluster.master.Master.MasterInfo
 import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.cluster.worker.WorkerId
 
 /**
  * Cluster Bootup Flow
  */
 object WorkerToMaster {
+
+  /** When an worker is started, it sends RegisterNewWorker */
   case object RegisterNewWorker
+
+  /** When worker lose connection with master, it tries to register itself again with old Id. */
   case class RegisterWorker(workerId: WorkerId)
+
+  /** Worker is responsible to broadcast its current status to master */
   case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
 }
 
 object MasterToWorker {
-  case class WorkerRegistered(workerId : WorkerId, masterInfo: MasterInfo)
-  case class UpdateResourceFailed(reason : String = null, ex: Throwable = null)
+
+  /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
+  case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
+
+  /** Worker have not received reply from master */
+  case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
+
+  /** Master is synced with worker on resource slots managed by current worker */
   case object UpdateResourceSucceed
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
index 6be52b2..53da645 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,19 @@
 
 package io.gearpump.cluster.embedded
 
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{ActorRef, ActorSystem, Props}
-import com.typesafe.config.{ConfigValueFactory, Config}
+import com.typesafe.config.{Config, ConfigValueFactory}
+
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.master.{Master => MasterActor}
 import io.gearpump.cluster.worker.{Worker => WorkerActor}
-import io.gearpump.util.{LogUtil, Constants, Util, ActorUtil}
-import io.gearpump.util.Constants.{GEARPUMP_METRIC_ENABLED, GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, MASTER, GEARPUMP_CLUSTER_MASTERS}
-import scala.collection.JavaConverters._
-
+import io.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
+import io.gearpump.util.{LogUtil, Util}
 
 /**
  * Create a in-process cluster with single worker
@@ -41,8 +44,8 @@ class EmbeddedCluster(inputConfig: Config) {
 
   private val LOG = LogUtil.getLogger(getClass)
 
-  def start: Unit = {
-    val port = Util.findFreePort.get
+  def start(): Unit = {
+    val port = Util.findFreePort().get
     val akkaConf = getConfig(inputConfig, port)
     _config = akkaConf
     val system = ActorSystem(MASTER, akkaConf)
@@ -64,10 +67,13 @@ class EmbeddedCluster(inputConfig: Config) {
   private def getConfig(inputConfig: Config, port: Int): Config = {
     val config = inputConfig.
       withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
-      withValue(GEARPUMP_CLUSTER_MASTERS, ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
-      withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, ConfigValueFactory.fromAnyRef(true)).
+      withValue(GEARPUMP_CLUSTER_MASTERS,
+        ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
+      withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
+        ConfigValueFactory.fromAnyRef(true)).
       withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
-      withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
+      withValue("akka.actor.provider",
+        ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
     config
   }
 
@@ -75,14 +81,14 @@ class EmbeddedCluster(inputConfig: Config) {
     ClientContext(_config, _system, _master)
   }
 
-  def stop: Unit = {
+  def stop(): Unit = {
     _system.stop(_master)
-    _system.shutdown()
-    _system.awaitTermination()
+    _system.terminate()
+    Await.result(_system.whenTerminated, Duration.Inf)
   }
 }
 
-object EmbeddedCluster{
+object EmbeddedCluster {
   def apply(): EmbeddedCluster = {
     new EmbeddedCluster(ClusterConfig.master())
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
index 90f653c..68f778e 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,13 +20,13 @@ package io.gearpump.cluster.main
 import java.io.File
 import java.net.{URL, URLClassLoader}
 import java.util.jar.JarFile
+import scala.util.Try
 
-import io.gearpump.util.{Util, Constants}
-import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
 import org.slf4j.Logger
 
-import scala.util.Try
+import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
 
+/** Tool to submit an application jar to cluster */
 object AppSubmitter extends AkkaApp with ArgumentsParser {
   val LOG: Logger = LogUtil.getLogger(getClass)
 
@@ -35,60 +35,66 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
   override val description = "Submit an application to Master by providing a jar"
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    "namePrefix" -> CLIOption[String]("<application name prefix>", required = false, defaultValue = Some("")),
+    "namePrefix" -> CLIOption[String]("<application name prefix>", required = false,
+      defaultValue = Some("")),
     "jar" -> CLIOption("<application>.jar", required = true),
-    "executors" -> CLIOption[Int]("number of executor to launch", required = false, defaultValue = Some(1)),
-    "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)),
+    "executors" -> CLIOption[Int]("number of executor to launch", required = false,
+      defaultValue = Some(1)),
+    "verbose" -> CLIOption("<print verbose log on console>", required = false,
+      defaultValue = Some(false)),
     // For document purpose only, OPTION_CONFIG option is not used here.
     // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None))
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
 
   def main(akkaConf: Config, args: Array[String]): Unit = {
 
     val config = parse(args)
-    if (null == config) {
-      return
-    }
+    if (null != config) {
 
-    val verbose = config.getBoolean("verbose")
-    if (verbose) {
-      LogUtil.verboseLogToConsole
-    }
+      val verbose = config.getBoolean("verbose")
+      if (verbose) {
+        LogUtil.verboseLogToConsole()
+      }
 
-    val jar = config.getString("jar")
+      val jar = config.getString("jar")
 
-    // Set jar path to be submitted to cluster
-    System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
-    System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
+      // Set jar path to be submitted to cluster
+      System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
+      System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
 
-    val namePrefix = config.getString("namePrefix")
-    if (namePrefix.nonEmpty) {
-      if (!Util.validApplicationName(namePrefix)) {
-        throw new Exception(s"$namePrefix is not a valid prefix for an application name")
+      val namePrefix = config.getString("namePrefix")
+      if (namePrefix.nonEmpty) {
+        if (!Util.validApplicationName(namePrefix)) {
+          throw new Exception(s"$namePrefix is not a valid prefix for an application name")
+        }
+        System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
       }
-      System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix)
-    }
 
-    val jarFile = new java.io.File(jar)
+      val jarFile = new java.io.File(jar)
 
-    //start main class
-    if (!jarFile.exists()) {
-      throw new Exception(s"jar $jar does not exist")
-    }
+      // Start main class
+      if (!jarFile.exists()) {
+        throw new Exception(s"jar $jar does not exist")
+      }
 
-    val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" + jarFile.getAbsolutePath)),
-      Thread.currentThread().getContextClassLoader())
-    val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader)
+      val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" +
+        jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader())
+      val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader)
 
-    //set the context classloader as ActorSystem will use context classloader in precedence.
-    Thread.currentThread().setContextClassLoader(classLoader)
-    val clazz = classLoader.loadClass(main)
-    val mainMethod = clazz.getMethod("main", classOf[Array[String]])
-    mainMethod.invoke(null, arguments)
+      // Set to context classloader. ActorSystem pick context classloader in preference
+      Thread.currentThread().setContextClassLoader(classLoader)
+      val clazz = classLoader.loadClass(main)
+      val mainMethod = clazz.getMethod("main", classOf[Array[String]])
+      mainMethod.invoke(null, arguments)
+    }
   }
 
-  private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader): (String, Array[String]) = {
-    val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.getValue("Main-Class")).getOrElse("")
+  private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader)
+    : (String, Array[String]) = {
+    val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes.
+      getValue("Main-Class")).getOrElse("")
+
     if (remainArgs.length > 0 && Try(classLoader.loadClass(remainArgs(0))).isSuccess) {
       (remainArgs(0), remainArgs.drop(1))
     } else if (mainInManifest.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
index 31b7492..4423727 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
  */
 package io.gearpump.cluster.main
 
-import io.gearpump.util.{Constants, LogUtil}
 import org.slf4j.Logger
 
-object Gear  {
+import io.gearpump.util.{Constants, LogUtil}
+
+object Gear {
 
   val OPTION_CONFIG = "conf"
 
@@ -29,12 +30,14 @@ object Gear  {
   val commands = Map("app" -> AppSubmitter, "kill" -> Kill,
     "info" -> Info, "replay" -> Replay, "main" -> MainRunner)
 
-  def usage: Unit = {
+  def usage(): Unit = {
     val keys = commands.keys.toList.sorted
+    // scalastyle:off println
     Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+    // scalastyle:on println
   }
 
-  def executeCommand(command : String, commandArgs : Array[String]) = {
+  private def executeCommand(command: String, commandArgs: Array[String]) = {
     commands.get(command).map(_.main(commandArgs))
     if (!commands.contains(command)) {
       val allArgs = (command +: commandArgs.toList).toArray
@@ -42,15 +45,15 @@ object Gear  {
     }
   }
 
-  def main(inputArgs: Array[String]) = {
+  def main(inputArgs: Array[String]): Unit = {
     val (configFile, args) = extractConfig(inputArgs)
     if (configFile != null) {
-      // set custom config file...
+      // Sets custom config file...
       System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile)
     }
 
     if (args.length == 0) {
-      usage
+      usage()
     } else {
       val command = args(0)
       val commandArgs = args.drop(1)
@@ -62,7 +65,7 @@ object Gear  {
     var index = 0
 
     var result = List.empty[String]
-    var configFile:String = null
+    var configFile: String = null
     while (index < inputArgs.length) {
       val item = inputArgs(index)
       if (item == s"-$OPTION_CONFIG") {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
index 878bcbf..4922690 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,13 @@
  */
 package io.gearpump.cluster.main
 
+import org.slf4j.Logger
+
 import io.gearpump.cluster.MasterToAppMaster.AppMastersData
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.util.{AkkaApp, LogUtil}
-import org.slf4j.Logger
-
-import scala.util.Try
 
+/** Tool to query master info */
 object Info extends AkkaApp with ArgumentsParser {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -31,11 +31,12 @@ object Info extends AkkaApp with ArgumentsParser {
   override val options: Array[(String, CLIOption[Any])] = Array(
     // For document purpose only, OPTION_CONFIG option is not used here.
     // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None))
-
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
 
   override val description = "Query the Application list"
 
+  // scalastyle:off println
   def main(akkaConf: Config, args: Array[String]): Unit = {
     val client = ClientContext(akkaConf)
 
@@ -48,4 +49,5 @@ object Info extends AkkaApp with ArgumentsParser {
     }
     client.close()
   }
+  // scalastyle:on println
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
index 195809c..3ce781f 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,12 +18,12 @@
 
 package io.gearpump.cluster.main
 
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.util.{AkkaApp, LogUtil}
 import org.slf4j.Logger
 
-import scala.util.Try
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.util.{AkkaApp, LogUtil}
 
+/** Tool to kill an App */
 object Kill extends AkkaApp with ArgumentsParser {
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
@@ -32,20 +32,19 @@ object Kill extends AkkaApp with ArgumentsParser {
     "appid" -> CLIOption("<application id>", required = true),
     // For document purpose only, OPTION_CONFIG option is not used here.
     // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None))
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
 
   override val description = "Kill an application with application Id"
 
   def main(akkaConf: Config, args: Array[String]): Unit = {
     val config = parse(args)
 
-    if (null == config) {
-      return
+    if (null != config) {
+      val client = ClientContext(akkaConf)
+      LOG.info("Client ")
+      client.shutdown(config.getInt("appid"))
+      client.close()
     }
-
-    val client = ClientContext(akkaConf)
-    LOG.info("Client ")
-    client.shutdown(config.getInt("appid"))
-    client.close()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
index d6b2479..d5681df 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,20 @@
 
 package io.gearpump.cluster.main
 
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{ActorSystem, Props}
 import com.typesafe.config.ConfigValueFactory
+import org.slf4j.Logger
+
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.master.{Master => MasterActor}
 import io.gearpump.cluster.worker.{Worker => WorkerActor}
 import io.gearpump.util.Constants._
 import io.gearpump.util.LogUtil.ProcessType
-import io.gearpump.util.{AkkaApp, ActorUtil, Constants, LogUtil, Util}
-import org.slf4j.Logger
-
-import scala.collection.JavaConversions._
+import io.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
 
 object Local extends AkkaApp with ArgumentsParser {
   override def akkaConfig: Config = ClusterConfig.master()
@@ -37,7 +40,8 @@ object Local extends AkkaApp with ArgumentsParser {
 
   override val options: Array[(String, CLIOption[Any])] =
     Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)),
-          "workernum"-> CLIOption[Int]("<how many workers to start>", required = false, defaultValue = Some(2)))
+      "workernum" -> CLIOption[Int]("<how many workers to start>", required = false,
+        defaultValue = Some(2)))
 
   override val description = "Start a local cluster"
 
@@ -49,23 +53,23 @@ object Local extends AkkaApp with ArgumentsParser {
     }
 
     val config = parse(args)
-    if (null == config) {
-      return
+    if (null != config) {
+      local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
     }
-    local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
   }
 
-  def local(workerCount : Int, sameProcess : Boolean, akkaConf: Config) : Unit = {
+  def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
     if (sameProcess) {
       LOG.info("Starting local in same process")
       System.setProperty("LOCAL", "true")
     }
-    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).toList.flatMap(Util.parseHostList)
+    val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+      .asScala.flatMap(Util.parseHostList)
     val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
 
-    if(masters.size != 1 && masters.head.host != local) {
-      LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match with ${Constants.GEARPUMP_HOSTNAME}")
-
+    if (masters.size != 1 && masters.head.host != local) {
+      LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
+        s"with ${Constants.GEARPUMP_HOSTNAME}")
     } else {
 
       val hostPort = masters.head
@@ -80,7 +84,7 @@ object Local extends AkkaApp with ArgumentsParser {
         system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
       }
 
-      system.awaitTermination()
+      Await.result(system.whenTerminated, Duration.Inf)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala b/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
index ea235ed..923a646 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,19 @@
 
 package io.gearpump.cluster.main
 
-import io.gearpump.util.{LogUtil, AkkaApp}
-import io.gearpump.util.{AkkaApp, LogUtil}
 import org.slf4j.Logger
 
-import scala.util.Try
+import io.gearpump.util.{AkkaApp, LogUtil}
 
+/** Tool to run any main class by providing a jar */
 object MainRunner extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   override val options: Array[(String, CLIOption[Any])] = Array(
     // For document purpose only, OPTION_CONFIG option is not used here.
     // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
-    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None))
+    Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false,
+      defaultValue = None))
 
   def main(akkaConf: Config, args: Array[String]): Unit = {
     val mainClazz = args(0)