You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:53 UTC

[43/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala
index 9ec7f35..969ce90 100644
--- a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala
+++ b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,45 +19,48 @@ package io.gearpump.jarstore.local
 
 import java.io.File
 import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
 
+import akka.actor.{ActorRef, ActorSystem}
 import akka.pattern.ask
-import akka.actor.{ActorRef, ActorSystem, ActorRefFactory}
-import io.gearpump.cluster.ClientToMaster.{JarStoreServerAddress, GetJarStoreServer}
-import io.gearpump.cluster.master.MasterProxy
 import com.typesafe.config.Config
-import io.gearpump.jarstore.{FilePath, JarStoreService}
-import io.gearpump.util._
-import scala.collection.JavaConversions._
 import org.slf4j.Logger
 
-import scala.concurrent.duration.Duration
-import scala.concurrent.{ExecutionContext, Await, Future}
+import io.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
+import io.gearpump.cluster.master.MasterProxy
+import io.gearpump.jarstore.{FilePath, JarStoreService}
+import io.gearpump.util._
 
 /**
  * LocalJarStoreService store the uploaded jar on local disk.
  */
-class LocalJarStoreService extends JarStoreService{
+class LocalJarStoreService extends JarStoreService {
   private def LOG: Logger = LogUtil.getLogger(getClass)
   private implicit val timeout = Constants.FUTURE_TIMEOUT
-  private var system : akka.actor.ActorSystem = null
-  private var master : ActorRef = null
+  private var system: akka.actor.ActorSystem = null
+  private var master: ActorRef = null
   private implicit def dispatcher: ExecutionContext = system.dispatcher
 
   override val scheme: String = "file"
 
   override def init(config: Config, system: ActorSystem): Unit = {
     this.system = system
-    val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).toList.flatMap(Util.parseHostList)
-    master = system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt}")
+    val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+      .asScala.flatMap(Util.parseHostList)
+    master = system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}")
   }
 
-  private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]].map { address =>
-    val client = new FileServer.Client(system, address.url)
-    client
-  }
+  private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]]
+    .map { address =>
+      val client = new FileServer.Client(system, address.url)
+      client
+    }
 
   /**
    * This function will copy the remote file to local file system, called from client side.
+   *
    * @param localFile The destination of file path
    * @param remotePath The remote file path from JarStore
    */

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
index c39d27e..1824a22 100644
--- a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
+++ b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,8 +18,8 @@
 
 package io.gearpump.util
 
-
 import java.io.File
+import scala.concurrent.{ExecutionContext, Future}
 
 import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
 import akka.http.scaladsl.server.Directives._
@@ -28,22 +28,17 @@ import akka.stream.Materializer
 import akka.stream.scaladsl.FileIO
 import akka.util.ByteString
 
-import scala.concurrent.{ExecutionContext, Future}
-
-
 /**
  * FileDirective is a set of Akka-http directive to upload/download
- * huge binary files.
- *
+ * huge binary files to/from Akka-Http server.
  */
 object FileDirective {
 
-  //form field name
+  // Form field name
   type Name = String
 
   val CHUNK_SIZE = 262144
 
-
   /**
    * File information after a file is uploaded to server.
    *
@@ -71,7 +66,6 @@ object FileDirective {
 
   type FormField = Either[FileInfo, String]
 
-
   /**
    * directive to uploadFile, it store the uploaded files
    * to temporary directory, and return a Map from form field name
@@ -101,9 +95,7 @@ object FileDirective {
     }
   }
 
-  /**
-   * download server file
-   */
+  // Downloads file from server
   def downloadFile(file: File): Route = {
     val responseEntity = HttpEntity(
       MediaTypes.`application/octet-stream`,
@@ -112,14 +104,16 @@ object FileDirective {
     complete(responseEntity)
   }
 
-  private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext): Directive1[Future[Form]] = {
+  private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext)
+    : Directive1[Future[Form]] = {
     Directive[Tuple1[Future[Form]]] { inner =>
       entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
         val form = formdata.parts.mapAsync(1) { p =>
           if (p.filename.isDefined) {
 
-            //reserve the suffix
-            val targetPath = File.createTempFile(s"userfile_${p.name}_", s"${p.filename.getOrElse("")}", rootDirectory)
+            // Reserve the suffix
+            val targetPath = File.createTempFile(s"userfile_${p.name}_",
+              s"${p.filename.getOrElse("")}", rootDirectory)
             val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
             written.map(written =>
               if (written.count > 0) {
@@ -128,14 +122,14 @@ object FileDirective {
                 Map.empty[Name, FormField]
               })
           } else {
-            val valueFuture = p.entity.dataBytes.runFold(ByteString.empty){(total, input) =>
+            val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) =>
               total ++ input
             }
             valueFuture.map{value =>
               Map(p.name -> Right(value.utf8String))
             }
           }
-        }.runFold(new Form(Map.empty[Name, FormField])){(set, value) =>
+        }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) =>
           new Form(set.fields ++ value)
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/util/FileServer.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/util/FileServer.scala b/daemon/src/main/scala/io/gearpump/util/FileServer.scala
index 4be3f2f..bf389f7 100644
--- a/daemon/src/main/scala/io/gearpump/util/FileServer.scala
+++ b/daemon/src/main/scala/io/gearpump/util/FileServer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,28 +17,26 @@
  */
 package io.gearpump.util
 
-
 import java.io.File
+import scala.concurrent.{ExecutionContext, Future}
 
 import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.Http.ServerBinding
 import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model.Uri.{Query, Path}
+import akka.http.scaladsl.model.Uri.{Path, Query}
 import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _}
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server._
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.ActorMaterializer
-import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
-import akka.stream.scaladsl.{Sink, Source, FileIO}
-import io.gearpump.jarstore.FilePath
-import io.gearpump.util.FileDirective._
-import io.gearpump.util.FileServer.Port
+import akka.stream.scaladsl.{FileIO, Sink, Source}
 import spray.json.DefaultJsonProtocol._
 import spray.json.JsonFormat
 
-import scala.concurrent.{ExecutionContext, Future}
+import io.gearpump.jarstore.FilePath
+import io.gearpump.util.FileDirective._
+import io.gearpump.util.FileServer.Port
 
 /**
  * A simple file server implemented with akka-http to store/fetch large
@@ -53,7 +51,7 @@ class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory
   val route: Route = {
     path("upload") {
       uploadFileTo(rootDirectory) { form =>
-        val fileName = form.fields.headOption.flatMap{pair =>
+        val fileName = form.fields.headOption.flatMap { pair =>
           val (_, fileInfo) = pair
           fileInfo match {
             case Left(file) => Option(file.file).map(_.getName)
@@ -90,7 +88,7 @@ class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory
           """.stripMargin)
         complete(entity)
       }
-        }
+    }
   }
 
   private var connection: Future[ServerBinding] = null
@@ -109,10 +107,10 @@ object FileServer {
 
   implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply)
 
-  case class Port(port : Int)
+  case class Port(port: Int)
 
   /**
-   * Client of [[FileServer]]
+   * Client of [[io.gearpump.util.FileServer]]
    */
   class Client(system: ActorSystem, host: String, port: Int) {
 
@@ -125,27 +123,28 @@ object FileServer {
     private implicit val ec = system.dispatcher
 
     val server = Uri(s"http://$host:$port")
-    val httpClient = Http(system).outgoingConnection(server.authority.host.address(), server.authority.port)
+    val httpClient = Http(system).outgoingConnection(server.authority.host.address(),
+      server.authority.port)
 
     def upload(file: File): Future[FilePath] = {
       val target = server.withPath(Path("/upload"))
 
-      val request = entity(file).map{entity =>
+      val request = entity(file).map { entity =>
         HttpRequest(HttpMethods.POST, uri = target, entity = entity)
       }
 
       val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head)
-      response.flatMap{some =>
+      response.flatMap { some =>
         Unmarshal(some).to[String]
-      }.map{path =>
+      }.map { path =>
         FilePath(path)
       }
     }
 
     def download(remoteFile: FilePath, saveAs: File): Future[Unit] = {
-      val downoad = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path))
-      //download file to local
-      val response = Source.single(HttpRequest(uri = downoad)).via(httpClient).runWith(Sink.head)
+      val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path))
+      // Download file to local
+      val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head)
       val downloaded = response.flatMap { response =>
         response.entity.dataBytes.runWith(FileIO.toFile(saveAs))
       }
@@ -153,7 +152,8 @@ object FileServer {
     }
 
     private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
-      val entity =  HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromFile(file, chunkSize = 100000))
+      val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
+        FileIO.fromFile(file, chunkSize = 100000))
       val body = Source.single(
         Multipart.FormData.BodyPart(
           "uploadfile",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
----------------------------------------------------------------------
diff --git a/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService b/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
index f0e0c5c..d226af9 100644
--- a/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
+++ b/daemon/src/test/resources/META-INF/services/io.gearpump.jarstore.JarStoreService
@@ -1,2 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 io.gearpump.jarstore.local.LocalJarStoreService
 io.gearpump.jarstore.dfs.DFSJarStoreService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala b/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala
index 36f70ed..c6dbbfe 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/MiniCluster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,18 +17,20 @@
  */
 package io.gearpump.cluster
 
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
 import akka.pattern.ask
-import akka.actor.{Actor, ActorRef, Props, ActorSystem}
 import akka.testkit.TestActorRef
 import com.typesafe.config.ConfigValueFactory
+
 import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers
 import io.gearpump.cluster.MasterToAppMaster.WorkerList
 import io.gearpump.cluster.master.Master
 import io.gearpump.cluster.worker.Worker
 import io.gearpump.util.Constants
 
-import scala.concurrent.{Await, Future}
-
 class MiniCluster {
   private val mockMasterIP = "127.0.0.1"
 
@@ -39,7 +41,7 @@ class MiniCluster {
     val master = system.actorOf(Props(classOf[Master]), "master")
     val worker = system.actorOf(Props(classOf[Worker], master), "worker")
 
-    //wait until worker register itself to master
+    // Wait until worker register itself to master
     waitUtilWorkerIsRegistered(master)
     (master, worker)
   }
@@ -48,12 +50,10 @@ class MiniCluster {
     TestActorRef(props)
   }
 
-
   private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
-    while(!isWorkerRegistered(master)) {}
+    while (!isWorkerRegistered(master)) {}
   }
 
-
   private def isWorkerRegistered(master: ActorRef): Boolean = {
     import scala.concurrent.duration._
     implicit val dispatcher = system.dispatcher
@@ -62,13 +62,13 @@ class MiniCluster {
 
     val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]]
 
-    // wait until the worker is registered.
-    val workers = Await.result[WorkerList](workerListFuture, 15 seconds)
+    // Waits until the worker is registered.
+    val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
     workers.workers.size > 0
   }
 
-  def shutDown() = {
-    system.shutdown()
-    system.awaitTermination()
+  def shutDown(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
index 9dc0289..30347d2 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala
@@ -15,34 +15,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.cluster.main
 
-import io.gearpump.cluster.MasterToAppMaster.{ReplayFromTimestampWindowTrailingEdge, AppMastersDataRequest}
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+import com.typesafe.config.Config
+import org.scalatest._
+
 import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
-import io.gearpump.cluster.MasterToAppMaster._
-import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ReplayApplicationResult, ShutdownApplicationResult}
+import io.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _}
+import io.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult}
 import io.gearpump.cluster.WorkerToMaster.RegisterNewWorker
 import io.gearpump.cluster.{MasterHarness, TestUtil}
 import io.gearpump.util.Constants._
 import io.gearpump.util.{Constants, LogUtil, Util}
-import org.scalatest._
-
-import scala.concurrent.duration.Duration
-import scala.util.{Success, Try}
-
-import scala.concurrent.Future
 
 class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
 
   private val LOG = LogUtil.getLogger(getClass)
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  override def config: Config = TestUtil.DEFAULT_CONFIG
 
-  override def beforeEach() = {
+  override def beforeEach(): Unit = {
     startActorSystem()
   }
 
-  override def afterEach() = {
+  override def afterEach(): Unit = {
     shutdownActorSystem()
   }
 
@@ -62,7 +62,6 @@ class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with Maste
       getMainClassName(Worker),
       Array.empty)
 
-
     try {
       masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
 
@@ -72,38 +71,41 @@ class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with Maste
     }
   }
 
-//  This UT fails a lot on Travis, temporarily delete it.
-//  "Master" should "accept worker RegisterNewWorker when started" in {
-//    val worker = TestProbe()(getActorSystem)
-//
-//    val port = Util.findFreePort.get
-//
-//    val masterConfig =  Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=127.0.0.1:$port",
-//      s"-D${Constants.GEARPUMP_HOSTNAME}=127.0.0.1")
-//
-//    val masterProcess = Util.startProcess(masterConfig,
-//      getContextClassPath,
-//      getMainClassName(io.gearpump.cluster.main.Master),
-//      Array("-ip", "127.0.0.1", "-port", port.toString))
-//
-//    //wait for master process to be started
-//
-//    try {
-//
-//      val masterProxy = getActorSystem.actorOf(MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
-//
-//      worker.send(masterProxy, RegisterNewWorker)
-//      worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
-//    } finally {
-//      masterProcess.destroy()
-//    }
-//  }
+  //  This UT fails a lot on Travis, temporarily delete it.
+  //  "Master" should "accept worker RegisterNewWorker when started" in {
+  //    val worker = TestProbe()(getActorSystem)
+  //
+  //    val port = Util.findFreePort.get
+  //
+  //    val masterConfig =  Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=127.0.0.1:$port",
+  //      s"-D${Constants.GEARPUMP_HOSTNAME}=127.0.0.1")
+  //
+  //    val masterProcess = Util.startProcess(masterConfig,
+  //      getContextClassPath,
+  //      getMainClassName(io.gearpump.cluster.main.Master),
+  //      Array("-ip", "127.0.0.1", "-port", port.toString))
+  //
+  //    //wait for master process to be started
+  //
+  //    try {
+  //
+  //      val masterProxy = getActorSystem.actorOf(
+  //        MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
+  //
+  //      worker.send(masterProxy, RegisterNewWorker)
+  //      worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
+  //    } finally {
+  //      masterProcess.destroy()
+  //    }
+  //  }
 
   "Info" should "be started without exception" in {
 
     val masterReceiver = createMockMaster()
 
-    Future {io.gearpump.cluster.main.Info.main(masterConfig, Array.empty)}
+    Future {
+      io.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
+    }
 
     masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
     masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName"))))
@@ -113,7 +115,9 @@ class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with Maste
 
     val masterReceiver = createMockMaster()
 
-    Future {Kill.main(masterConfig, Array("-appid", "0"))}
+    Future {
+      Kill.main(masterConfig, Array("-appid", "0"))
+    }
 
     masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
     masterReceiver.reply(ShutdownApplicationResult(Success(0)))
@@ -123,7 +127,9 @@ class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with Maste
 
     val masterReceiver = createMockMaster()
 
-    Future {Replay.main(masterConfig, Array("-appid", "0"))}
+    Future {
+      Replay.main(masterConfig, Array("-appid", "0"))
+    }
 
     masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
     masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
@@ -132,7 +138,7 @@ class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with Maste
   }
 
   "Local" should "be started without exception" in {
-    val port = Util.findFreePort.get
+    val port = Util.findFreePort().get
     val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
       s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
       s"-D${PREFER_IPV4}=true")
@@ -149,14 +155,15 @@ class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with Maste
       val result = fn
       if (result || times <= 0) {
         result
-      } else  {
+      } else {
         Thread.sleep(1000)
         retry(times - 1)(fn)
       }
     }
 
     try {
-      assert(retry(10)(isPortUsed("127.0.0.1", port)), "local is not started successfully, as port is not used " + port)
+      assert(retry(10)(isPortUsed("127.0.0.1", port)),
+        "local is not started successfully, as port is not used " + port)
     } finally {
       local.destroy()
     }
@@ -169,9 +176,8 @@ class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with Maste
     assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
 
     for (command <- commands) {
-      //Temporarily disable this test
-      //assert(Try(Gear.main(Array(command))).isSuccess, "print help, no throw, command: " + command)
-      assert(Try(Gear.main(Array("-noexist"))).isFailure, "pass unknown option, throw, command: " + command)
+      assert(Try(Gear.main(Array("-noexist"))).isFailure,
+        "pass unknown option, throw, command: " + command)
     }
 
     assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
index 3927993..66b9ea8 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/main/MasterWatcherSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,15 +17,15 @@
  */
 package io.gearpump.cluster.main
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.{ActorSystem, Props}
-import akka.testkit.{TestActorRef, TestProbe}
+import akka.testkit.TestProbe
 import com.typesafe.config.Config
-import io.gearpump.cluster.{TestUtil, MasterHarness}
-import io.gearpump.cluster.{MasterHarness, TestUtil}
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import org.scalatest.{FlatSpec, Matchers}
 
-import scala.concurrent.duration._
-import scala.language.postfixOps
+import io.gearpump.cluster.TestUtil
 
 class MasterWatcherSpec extends FlatSpec with Matchers {
   def config: Config = TestUtil.MASTER_CONFIG
@@ -37,8 +37,8 @@ class MasterWatcherSpec extends FlatSpec with Matchers {
 
     val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
     actorWatcher watch masterWatcher
-    actorWatcher.expectTerminated(masterWatcher, 5 seconds)
-    system.shutdown()
-    system.awaitTermination()
+    actorWatcher.expectTerminated(masterWatcher, 5.seconds)
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala
index ad19675..ee6e0e2 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/master/AppManagerSpec.scala
@@ -18,42 +18,44 @@
 
 package io.gearpump.cluster.master
 
+import scala.util.Success
+
 import akka.actor.{Actor, ActorRef, Props}
 import akka.testkit.TestProbe
-import io.gearpump.cluster.AppMasterToMaster.AppDataSaved
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, AppMasterRegistered}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.AppMasterToMaster._
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import io.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
 import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
-import io.gearpump.cluster.MasterToAppMaster._
-import io.gearpump.cluster.MasterToClient.{SubmitApplicationResult, ShutdownApplicationResult, ReplayApplicationResult, ResolveAppIdResult}
-import io.gearpump.cluster._
-import io.gearpump.cluster.master.AppManager._
+import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
+import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
 import io.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
+import io.gearpump.cluster.master.AppManager._
 import io.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess}
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import scala.util.Success
+import io.gearpump.cluster.{TestUtil, _}
+import io.gearpump.util.LogUtil
 
 class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
   var kvService: TestProbe = null
   var haService: TestProbe = null
   var appLauncher: TestProbe = null
-  var appManager : ActorRef = null
+  var appManager: ActorRef = null
+  private val LOG = LogUtil.getLogger(getClass)
 
-  override def config = TestUtil.DEFAULT_CONFIG
+  override def config: Config = TestUtil.DEFAULT_CONFIG
 
-  override def beforeEach() = {
+  override def beforeEach(): Unit = {
     startActorSystem()
     kvService = TestProbe()(getActorSystem)
     appLauncher = TestProbe()(getActorSystem)
 
-    appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref, new DummyAppMasterLauncherFactory(appLauncher))))
+    appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
+      new DummyAppMasterLauncherFactory(appLauncher))))
     kvService.expectMsgType[GetKV]
     kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Map.empty)))
   }
 
-  override def afterEach() = {
+  override def afterEach(): Unit = {
     shutdownActorSystem()
   }
 
@@ -84,7 +86,7 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with
   }
 
   "AppManager" should "support application submission and recover if appmaster dies" in {
-    Console.out.println("=================testing recover==============")
+    LOG.info("=================testing recover==============")
     testClientSubmission(withRecover = true)
   }
 
@@ -112,14 +114,15 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with
 
     kvService.expectMsgType[PutKV]
     appLauncher.expectMsg(LauncherStarted(appId))
-    appMaster.send(appManager, RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, app.name)))
+    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+      AppMasterRuntimeInfo(appId, app.name)))
     appMaster.expectMsgType[AppMasterRegistered]
 
     client.send(appManager, submit)
     assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
   }
 
-  def testClientSubmission(withRecover: Boolean) : Unit = {
+  def testClientSubmission(withRecover: Boolean): Unit = {
     val app = TestUtil.dummyApp
     val submit = SubmitApplication(app, None, "username")
     val client = TestProbe()(getActorSystem)
@@ -131,7 +134,8 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with
 
     kvService.expectMsgType[PutKV]
     appLauncher.expectMsg(LauncherStarted(appId))
-    appMaster.send(appManager, RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, app.name)))
+    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+      AppMasterRuntimeInfo(appId, app.name)))
     kvService.expectMsgType[PutKV]
     appMaster.expectMsgType[AppMasterRegistered]
 
@@ -148,10 +152,10 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with
       client.send(appManager, ShutdownApplication(appId))
       client.expectMsg(ShutdownApplicationResult(Success(appId)))
     } else {
-      //do recover
+      // Do recovery
       getActorSystem.stop(appMaster.ref)
       kvService.expectMsgType[GetKV]
-      val appState = ApplicationState(appId, "application1", 1, app , None, "username", null)
+      val appState = ApplicationState(appId, "application1", 1, app, None, "username", null)
       kvService.reply(GetKVSuccess(APP_STATE, appState))
       appLauncher.expectMsg(LauncherStarted(appId))
     }
@@ -160,7 +164,8 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with
 
 class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory {
 
-  override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef]): Props = {
+  override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+      username: String, master: ActorRef, client: Option[ActorRef]): Props = {
     Props(new DummyAppMasterLauncher(test, appId))
   }
 }
@@ -169,8 +174,8 @@ class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
 
   test.ref ! LauncherStarted(appId)
   override def receive: Receive = {
-    case any : Any => test.ref forward any
+    case any: Any => test.ref forward any
   }
 }
 
-case class LauncherStarted(appId : Int)
+case class LauncherStarted(appId: Int)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
index 8f60d34..b929349 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/master/InMemoryKVServiceSpec.scala
@@ -18,28 +18,32 @@
 
 package io.gearpump.cluster.master
 
+import scala.concurrent.duration._
+
 import akka.actor.Props
 import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
 import io.gearpump.cluster.master.InMemoryKVService._
 import io.gearpump.cluster.{MasterHarness, TestUtil}
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-import scala.concurrent.duration._
 
-class InMemoryKVServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+class InMemoryKVServiceSpec
+  extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
 
-  override def beforeEach() = {
+  override def beforeEach(): Unit = {
     startActorSystem()
   }
 
-  override def afterEach() = {
+  override def afterEach(): Unit = {
     shutdownActorSystem()
   }
 
-  override def config = TestUtil.MASTER_CONFIG
+  override def config: Config = TestUtil.MASTER_CONFIG
 
   "KVService" should "get, put, delete correctly" in {
-   val system = getActorSystem
-   val kvService = system.actorOf(Props(new InMemoryKVService()))
+    val system = getActorSystem
+    val kvService = system.actorOf(Props(new InMemoryKVService()))
     val group = "group"
 
     val client = TestProbe()(system)
@@ -55,11 +59,11 @@ class InMemoryKVServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEa
 
     client.send(kvService, DeleteKVGroup(group))
 
-    // after DeleteGroup, it no longer accept Get and Put
+    // After DeleteGroup, it no longer accept Get and Put message for this group.
     client.send(kvService, GetKV(group, "key"))
-    client.expectNoMsg(3 seconds)
+    client.expectNoMsg(3.seconds)
 
     client.send(kvService, PutKV(group, "key", 3))
-    client.expectNoMsg(3 seconds)
+    client.expectNoMsg(3.seconds)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
index aa2028b..a75ade2 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,25 +17,26 @@
  */
 package io.gearpump.cluster.scheduler
 
+import scala.concurrent.duration._
+
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import io.gearpump.WorkerId
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
 import io.gearpump.cluster.TestUtil
 import io.gearpump.cluster.WorkerToMaster.ResourceUpdate
 import io.gearpump.cluster.master.Master.MasterInfo
+import io.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
 import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
+import io.gearpump.cluster.worker.WorkerId
 
 class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
   with WordSpecLike with Matchers with BeforeAndAfterAll{
 
-  def this() = this(ActorSystem("PrioritySchedulerSpec",  TestUtil.DEFAULT_CONFIG))
+  def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
   val appId = 0
   val workerId1: WorkerId = WorkerId(1, 0L)
   val workerId2: WorkerId = WorkerId(2, 0L)
@@ -51,28 +52,33 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
     "update resource only when the worker is registered" in {
       val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
       scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
-      expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been registered into master"))
+      expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " +
+        s"registered into master"))
     }
 
     "drop application's resource requests when the application is removed" in {
       val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
       scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
       scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
       scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
       scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
-      mockAppMaster.expectNoMsg(5 seconds)
+      mockAppMaster.expectNoMsg(5.seconds)
     }
   }
 
+  def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = {
+    left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
+  }
+
   "The resource request with higher priority" should {
     "be handled first" in {
       val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.LOW, Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.NORMAL, Relaxation.ANY)
-      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY)
+      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY)
 
       scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
       scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
@@ -81,58 +87,107 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
       scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
 
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1))))
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))))
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))))
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
 
       scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))))
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
     }
   }
 
   "The resource request which delivered earlier" should {
     "be handled first if the priorities are the same" in {
       val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
-      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY)
+      val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+      val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
       scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
       scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
       scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
 
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))))
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))))
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
     }
   }
 
   "The PriorityScheduler" should {
     "handle the resource request with different relaxation" in {
       val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
-      val request1 = ResourceRequest(Resource(40), workerId2, Priority.HIGH, Relaxation.SPECIFICWORKER)
-      val request2 = ResourceRequest(Resource(20), workerId1, Priority.NORMAL, Relaxation.SPECIFICWORKER)
+      val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER)
+      val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER)
 
       scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
       scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
       scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))))
+
+      var expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
 
       scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))))
 
-      val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, Priority.NORMAL, Relaxation.ANY, executorNum = 2)
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      val request3 = ResourceRequest(
+        Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2)
       scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))))
 
-      //we have to manually update the resource on each worker
+      expect = ResourceAllocated(Array(
+        ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
+        ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
+
+      // We have to manually update the resource on each worker
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
-      val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), Priority.NORMAL, Relaxation.ONEWORKER)
+      val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER)
       scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))))
+
+      expect = ResourceAllocated(
+        Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
+      mockAppMaster.expectMsgPF(5.seconds) {
+        case request: ResourceAllocated if sameElement(request, expect) => Unit
+      }
     }
   }
 
@@ -144,7 +199,7 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
       scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
 
-      //By default, the request requires only one executor
+      // By default, the request requires only one executor
       val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
       scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
       val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
@@ -157,7 +212,7 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
       assert(allocations3.allocations.length == 3)
       assert(allocations3.allocations.forall(_.resource == Resource(8)))
 
-      //The total available resource can not satisfy the requirements with executor number
+      // The total available resource can not satisfy the requirements with executor number
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
       val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
@@ -166,7 +221,7 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
       assert(allocations4.allocations.length == 2)
       assert(allocations4.allocations.forall(_.resource == Resource(20)))
 
-      //When new resources are available, the remaining request will be satisfied
+      // When new resources are available, the remaining request will be satisfied
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
       val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
       assert(allocations5.allocations.length == 1)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
index 9fc4096..46e8d37 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,14 @@
  */
 package io.gearpump.cluster.worker
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.{ActorSystem, PoisonPill, Props}
 import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import io.gearpump.WorkerId
-import io.gearpump.cluster.WorkerToMaster.RegisterNewWorker
-import io.gearpump.cluster.{TestUtil, MasterHarness}
-import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest._
+
 import io.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor}
 import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
 import io.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed}
@@ -32,13 +33,9 @@ import io.gearpump.cluster.master.Master.MasterInfo
 import io.gearpump.cluster.scheduler.Resource
 import io.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
 import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
-import org.scalatest._
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
 
 class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-  override def config = TestUtil.DEFAULT_CONFIG
+  override def config: Config = TestUtil.DEFAULT_CONFIG
 
   val appId = 1
   val workerId: WorkerId = WorkerId(1, 0L)
@@ -48,14 +45,14 @@ class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with Mas
   var client: TestProbe = null
   val workerSlots = 50
 
-  override def beforeEach() = {
+  override def beforeEach(): Unit = {
     startActorSystem()
     mockMaster = TestProbe()(getActorSystem)
     masterProxy = TestProbe()(getActorSystem)
     client = TestProbe()(getActorSystem)
   }
 
-  override def afterEach() = {
+  override def afterEach(): Unit = {
     shutdownActorSystem()
   }
 
@@ -64,7 +61,7 @@ class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with Mas
       val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
       mockMaster watch worker
       mockMaster.expectMsg(RegisterNewWorker)
-      mockMaster.expectTerminated(worker, 60 seconds)
+      mockMaster.expectTerminated(worker, 60.seconds)
     }
   }
 
@@ -80,10 +77,11 @@ class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with Mas
       worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
       mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots)))
 
-      worker.tell(UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
-      mockMaster.expectTerminated(worker, 5 seconds)
-      workerSystem.shutdown()
-      workerSystem.awaitTermination()
+      worker.tell(
+        UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
+      mockMaster.expectTerminated(worker, 5.seconds)
+      workerSystem.terminate()
+      Await.result(workerSystem.whenTerminated, Duration.Inf)
     }
   }
 
@@ -96,11 +94,17 @@ class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with Mas
       mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
 
       val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
-      val reportBack = "dummy"   //This is an actor path which the ActorSystemBooter will report back to, not needed in this test.
-      val executionContext = ExecutorJVMConfig(Array.empty[String], getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "), classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None, username = "user")
-
-      //Test LaunchExecutor
-      worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext), mockMaster.ref)
+      // This is an actor path which the ActorSystemBooter will report back to,
+      // not needed in this test
+      val reportBack = "dummy"
+      val executionContext = ExecutorJVMConfig(Array.empty[String],
+        getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "),
+        classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None,
+        username = "user")
+
+      // Test LaunchExecutor
+      worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext),
+        mockMaster.ref)
       mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine"))
 
       worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref)
@@ -109,13 +113,14 @@ class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with Mas
       worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref)
       mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
 
-      //Test terminationWatch
+      // Test terminationWatch
       worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref)
       mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
       client.expectMsg(ShutdownExecutorSucceed(1, 1))
 
       worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref)
-      client.expectMsg(ShutdownExecutorFailed(s"Can not find executor ${executorId + 1} for app $appId"))
+      client.expectMsg(ShutdownExecutorFailed(
+        s"Can not find executor ${executorId + 1} for app $appId"))
 
       mockMaster.ref ! PoisonPill
       masterProxy.expectMsg(RegisterWorker(workerId))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala b/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala
index c2ca669..66c7c1d 100644
--- a/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/util/FileServerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,30 +20,29 @@ package io.gearpump.util
 
 import java.io.File
 import java.util.concurrent.TimeUnit
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.ActorSystem
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
-import akka.actor.{ActorSystem, Props}
-import akka.pattern.ask
+import io.gearpump.cluster.TestUtil
 import io.gearpump.google.common.io.Files
-import io.gearpump.cluster.{ClusterConfig, TestUtil}
 import io.gearpump.jarstore.FilePath
 import io.gearpump.util.FileServer._
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-import scala.util.Success
 
-class FileServerSpec  extends WordSpecLike with Matchers with BeforeAndAfterAll {
+class FileServerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll {
 
   implicit val timeout = akka.util.Timeout(25, TimeUnit.SECONDS)
   val host = "localhost"
+  private val LOG = LogUtil.getLogger(getClass)
 
   var system: ActorSystem = null
 
   override def afterAll {
     if (null != system) {
-      system.shutdown()
-      system.awaitTermination()
+      system.terminate()
+      Await.result(system.whenTerminated, Duration.Inf)
     }
   }
 
@@ -57,7 +56,7 @@ class FileServerSpec  extends WordSpecLike with Matchers with BeforeAndAfterAll
     FileUtils.writeByteArrayToFile(file, data)
     val future = client.upload(file)
     import scala.concurrent.duration._
-    val path = Await.result(future, 30 seconds)
+    val path = Await.result(future, 30.seconds)
     file.delete()
     path
   }
@@ -66,7 +65,7 @@ class FileServerSpec  extends WordSpecLike with Matchers with BeforeAndAfterAll
     val file = File.createTempFile("fileserverspec", "test")
     val future = client.download(remote, file)
     import scala.concurrent.duration._
-    val data = Await.result(future, 10 seconds)
+    val data = Await.result(future, 10.seconds)
 
     val bytes = FileUtils.readFileToByteArray(file)
     file.delete()
@@ -81,7 +80,7 @@ class FileServerSpec  extends WordSpecLike with Matchers with BeforeAndAfterAll
       val server = new FileServer(system, host, 0, rootDir)
       val port = Await.result((server.start), Duration(25, TimeUnit.SECONDS))
 
-      println("start test web server on port " + port)
+      LOG.info("start test web server on port " + port)
 
       val sizes = List(1, 100, 1000000, 50000000)
       val client = new Client(system, host, port.port)
@@ -113,7 +112,7 @@ class FileServerSpec  extends WordSpecLike with Matchers with BeforeAndAfterAll
     }
   }
 
-  private def randomBytes(size : Int) : Array[Byte] = {
+  private def randomBytes(size: Int): Array[Byte] = {
     val bytes = new Array[Byte](size)
     (new java.util.Random()).nextBytes(bytes)
     bytes

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/README.md
----------------------------------------------------------------------
diff --git a/docs/README.md b/docs/README.md
index 95fc380..e0793ee 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -5,12 +5,19 @@ have docs corresponding to your checked out version.
 
 # Requirements
 
+You need to install ruby and ruby-dev first. On Ubuntu, you ca run command like this:
+
+    sudo apt-get install ruby
+    sudo apt-get install ruby-dev
+    sudo apt-get install python-setuptools
+
 We use Markdown to write and Jekyll to translate the documentation to static HTML. You can install
 all needed software via:
 
-    gem install jekyll
-    gem install kramdown
-    gem install html-proofer
+    sudo gem install jekyll
+    sudo gem install kramdown
+    sudo gem install html-proofer
+    sudo gem install pygments.rb
     sudo easy_install Pygments
 
 For Mac OSX you may need to do `sudo gem install -n /usr/local/bin jekyll` if you see the following error:
@@ -22,17 +29,10 @@ ERROR:  While executing gem ... (Errno::EPERM)
 Kramdown is needed for Markdown processing and the Python based Pygments is used for syntax
 highlighting.
 
-# How to Test
-
-Command `jekyll build` can be used to make a test build.
-
-Command `jekyll serve --watch` can be used for debug purpose. Jekyll will start a web server at
-`localhost:4000` and watch the docs directory for updates. Use this mode to experiment commits and check changes locally.
-
 # How to Build
 Command `./build_doc.sh` can be used to create a full document folder under site/. 
 
-# Contribute
+# How to contribute
 
 The documentation pages are written in
 [Markdown](http://daringfireball.net/projects/markdown/syntax). It is possible to use the
@@ -52,7 +52,7 @@ Furthermore, you can access variables found in `docs/_config.yml` as follows:
 This will be replaced with the value of the variable called `NAME` when generating
 the docs.
 
-All documents are structed with headings. From these heading, a page outline is
+All documents are structured with headings. From these heading, a page outline is
 automatically generated for each page.
 
 ```
@@ -66,3 +66,10 @@ automatically generated for each page.
 Please stick to the "logical order" when using the headlines, e.g. start with level-2 headings and
 use level-3 headings for subsections, etc. Don't use a different ordering, because you don't like
 how a headline looks.
+
+# How to Test
+
+Command `jekyll build` can be used to make a test build.
+
+Command `jekyll serve --watch` can be used for debug purpose. Jekyll will start a web server at
+`localhost:4000` and watch the docs directory for updates. Use this mode to experiment commits and check changes locally.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/_layouts/404.html
----------------------------------------------------------------------
diff --git a/docs/_layouts/404.html b/docs/_layouts/404.html
index 0446544..1bfbe21 100644
--- a/docs/_layouts/404.html
+++ b/docs/_layouts/404.html
@@ -1,157 +1,159 @@
 <!DOCTYPE html>
 <html lang="en">
-    <head>
-        <meta charset="utf-8">
-        <title>Page Not Found :(</title>
-        <style>
-            ::-moz-selection {
-                background: #b3d4fc;
-                text-shadow: none;
-            }
-
-            ::selection {
-                background: #b3d4fc;
-                text-shadow: none;
-            }
-
-            html {
-                padding: 30px 10px;
-                font-size: 20px;
-                line-height: 1.4;
-                color: #737373;
-                background: #f0f0f0;
-                -webkit-text-size-adjust: 100%;
-                -ms-text-size-adjust: 100%;
-            }
-
-            html,
-            input {
-                font-family: "Helvetica Neue", Helvetica, Arial, sans-serif;
-            }
-
-            body {
-                max-width: 500px;
-                _width: 500px;
-                padding: 30px 20px 50px;
-                border: 1px solid #b3b3b3;
-                border-radius: 4px;
-                margin: 0 auto;
-                box-shadow: 0 1px 10px #a7a7a7, inset 0 1px 0 #fff;
-                background: #fcfcfc;
-            }
-
-            h1 {
-                margin: 0 10px;
-                font-size: 50px;
-                text-align: center;
-            }
-
-            h1 span {
-                color: #bbb;
-            }
-
-            h3 {
-                margin: 1.5em 0 0.5em;
-            }
-
-            p {
-                margin: 1em 0;
-            }
-
-            ul {
-                padding: 0 0 0 40px;
-                margin: 1em 0;
-            }
-
-            .container {
-                max-width: 380px;
-                _width: 380px;
-                margin: 0 auto;
-            }
-
-            /* google search */
-
-            #goog-fixurl ul {
-                list-style: none;
-                padding: 0;
-                margin: 0;
-            }
-
-            #goog-fixurl form {
-                margin: 0;
-            }
-
-            #goog-wm-qt,
-            #goog-wm-sb {
-                border: 1px solid #bbb;
-                font-size: 16px;
-                line-height: normal;
-                vertical-align: top;
-                color: #444;
-                border-radius: 2px;
-            }
-
-            #goog-wm-qt {
-                width: 220px;
-                height: 20px;
-                padding: 5px;
-                margin: 5px 10px 0 0;
-                box-shadow: inset 0 1px 1px #ccc;
-            }
-
-            #goog-wm-sb {
-                display: inline-block;
-                height: 32px;
-                padding: 0 10px;
-                margin: 5px 0 0;
-                white-space: nowrap;
-                cursor: pointer;
-                background-color: #f5f5f5;
-                background-image: -webkit-linear-gradient(rgba(255,255,255,0), #f1f1f1);
-                background-image: -moz-linear-gradient(rgba(255,255,255,0), #f1f1f1);
-                background-image: -ms-linear-gradient(rgba(255,255,255,0), #f1f1f1);
-                background-image: -o-linear-gradient(rgba(255,255,255,0), #f1f1f1);
-                -webkit-appearance: none;
-                -moz-appearance: none;
-                appearance: none;
-                *overflow: visible;
-                *display: inline;
-                *zoom: 1;
-            }
-
-            #goog-wm-sb:hover,
-            #goog-wm-sb:focus {
-                border-color: #aaa;
-                box-shadow: 0 1px 1px rgba(0, 0, 0, 0.1);
-                background-color: #f8f8f8;
-            }
-
-            #goog-wm-qt:hover,
-            #goog-wm-qt:focus {
-                border-color: #105cb6;
-                outline: 0;
-                color: #222;
-            }
-
-            input::-moz-focus-inner {
-                padding: 0;
-                border: 0;
-            }
-        </style>
-    </head>
-    <body>
-        <div class="container">
-            <h1>Not found <span>:(</span></h1>
-            <p>Sorry, but the page you were trying to view does not exist.</p>
-            <p>It looks like this was the result of either:</p>
-            <ul>
-                <li>a mistyped address</li>
-                <li>an out-of-date link</li>
-            </ul>
-            <script>
-                var GOOG_FIXURL_LANG = (navigator.language || '').slice(0,2),GOOG_FIXURL_SITE = location.host;
-            </script>
-            <script src="http://linkhelp.clients.google.com/tbproxy/lh/wm/fixurl.js"></script>
-        </div>
-    </body>
+<head>
+  <meta charset="utf-8">
+  <title>Page Not Found :(</title>
+  <style>
+    ::-moz-selection {
+      background: #b3d4fc;
+      text-shadow: none;
+    }
+
+    ::selection {
+      background: #b3d4fc;
+      text-shadow: none;
+    }
+
+    html {
+      padding: 30px 10px;
+      font-size: 20px;
+      line-height: 1.4;
+      color: #737373;
+      background: #f0f0f0;
+      -webkit-text-size-adjust: 100%;
+      -ms-text-size-adjust: 100%;
+    }
+
+    html,
+    input {
+      font-family: "Helvetica Neue", Helvetica, Arial, sans-serif;
+    }
+
+    body {
+      max-width: 500px;
+      _width: 500px;
+      padding: 30px 20px 50px;
+      border: 1px solid #b3b3b3;
+      border-radius: 4px;
+      margin: 0 auto;
+      box-shadow: 0 1px 10px #a7a7a7, inset 0 1px 0 #fff;
+      background: #fcfcfc;
+    }
+
+    h1 {
+      margin: 0 10px;
+      font-size: 50px;
+      text-align: center;
+    }
+
+    h1 span {
+      color: #bbb;
+    }
+
+    h3 {
+      margin: 1.5em 0 0.5em;
+    }
+
+    p {
+      margin: 1em 0;
+    }
+
+    ul {
+      padding: 0 0 0 40px;
+      margin: 1em 0;
+    }
+
+    .container {
+      max-width: 380px;
+      _width: 380px;
+      margin: 0 auto;
+    }
+
+    /* google search */
+
+    #goog-fixurl ul {
+      list-style: none;
+      padding: 0;
+      margin: 0;
+    }
+
+    #goog-fixurl form {
+      margin: 0;
+    }
+
+    #goog-wm-qt,
+    #goog-wm-sb {
+      border: 1px solid #bbb;
+      font-size: 16px;
+      line-height: normal;
+      vertical-align: top;
+      color: #444;
+      border-radius: 2px;
+    }
+
+    #goog-wm-qt {
+      width: 220px;
+      height: 20px;
+      padding: 5px;
+      margin: 5px 10px 0 0;
+      box-shadow: inset 0 1px 1px #ccc;
+    }
+
+    #goog-wm-sb {
+      display: inline-block;
+      height: 32px;
+      padding: 0 10px;
+      margin: 5px 0 0;
+      white-space: nowrap;
+      cursor: pointer;
+      background-color: #f5f5f5;
+      background-image: -webkit-linear-gradient(rgba(255, 255, 255, 0), #f1f1f1);
+      background-image: -moz-linear-gradient(rgba(255, 255, 255, 0), #f1f1f1);
+      background-image: -ms-linear-gradient(rgba(255, 255, 255, 0), #f1f1f1);
+      background-image: -o-linear-gradient(rgba(255, 255, 255, 0), #f1f1f1);
+      -webkit-appearance: none;
+      -moz-appearance: none;
+      appearance: none;
+      *overflow: visible;
+      *display: inline;
+      *zoom: 1;
+    }
+
+    #goog-wm-sb:hover,
+    #goog-wm-sb:focus {
+      border-color: #aaa;
+      box-shadow: 0 1px 1px rgba(0, 0, 0, 0.1);
+      background-color: #f8f8f8;
+    }
+
+    #goog-wm-qt:hover,
+    #goog-wm-qt:focus {
+      border-color: #105cb6;
+      outline: 0;
+      color: #222;
+    }
+
+    input::-moz-focus-inner {
+      padding: 0;
+      border: 0;
+    }
+  </style>
+</head>
+<body>
+<div class="container">
+  <h1>Not found <span>:(</span></h1>
+
+  <p>Sorry, but the page you were trying to view does not exist.</p>
+
+  <p>It looks like this was the result of either:</p>
+  <ul>
+    <li>a mistyped address</li>
+    <li>an out-of-date link</li>
+  </ul>
+  <script>
+    var GOOG_FIXURL_LANG = (navigator.language || '').slice(0, 2), GOOG_FIXURL_SITE = location.host;
+  </script>
+  <script src="http://linkhelp.clients.google.com/tbproxy/lh/wm/fixurl.js"></script>
+</div>
+</body>
 </html>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/_layouts/global.html
----------------------------------------------------------------------
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index d17a55a..d5c7a20 100644
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -1,20 +1,25 @@
 <!DOCTYPE html>
-<!--[if lt IE 7]>      <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
-<!--[if IE 7]>         <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
-<!--[if IE 8]>         <html class="no-js lt-ie9"> <![endif]-->
-<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
+<!--[if lt IE 7]>
+<html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
+<!--[if IE 7]>
+<html class="no-js lt-ie9 lt-ie8"> <![endif]-->
+<!--[if IE 8]>
+<html class="no-js lt-ie9"> <![endif]-->
+<!--[if gt IE 8]><!-->
+<html class="no-js"> <!--<![endif]-->
 <head>
   <meta charset="utf-8">
   <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
   <meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=1"/>
   <title>{{ page.title }} - Gearpump {{site.GEARPUMP_VERSION}} Documentation</title>
   {% if page.description %}
-   <meta name="description" content="{{page.description | replace: 'GEARPUMP_VERSION', site.GEARPUMP_VERSION}}">
+  <meta name="description"
+        content="{{page.description | replace: 'GEARPUMP_VERSION', site.GEARPUMP_VERSION}}">
   {% endif %}
 
   {% if page.redirect %}
-   <meta http-equiv="refresh" content="0; url={{page.redirect}}">
-   <link rel="canonical" href="{{page.redirect}}" />
+  <meta http-equiv="refresh" content="0; url={{page.redirect}}">
+  <link rel="canonical" href="{{page.redirect}}"/>
   {% endif %}
 
   <link rel="stylesheet" href="css/bootstrap-3.3.5.min.css">
@@ -29,139 +34,149 @@
   <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
 </head>
 <body>
-  <!--[if lt IE 7]>
-    <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
-  <![endif]-->
+<!--[if lt IE 7]>
+<p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade
+  your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install
+  Google Chrome Frame</a> to better experience this site.</p>
+<![endif]-->
 
-  <div class="navbar navbar-inverse navbar-fixed-top" id="topbar">
-    <div class="container">
-      <div class="navbar-header">
-        <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" aria-expanded="false" aria-controls="navbar">
-          <span class="sr-only">Toggle navigation</span>
-          <span class="icon-bar"></span>
-          <span class="icon-bar"></span>
-          <span class="icon-bar"></span>
-        </button>
-        <a class="navbar-brand" href="http://gearpump.io">Gearpump
-          <span class="label label-primary" style="font-size: .6em">{{site.GEARPUMP_VERSION}}</span>
-        </a>
-      </div>
-      <div id="navbar" class="collapse navbar-collapse">
-        <ul class="nav navbar-nav">
-          <li><a href="index.html">Overview</a></li>
+<div class="navbar navbar-inverse navbar-fixed-top" id="topbar">
+  <div class="container">
+    <div class="navbar-header">
+      <button type="button" class="navbar-toggle collapsed" data-toggle="collapse"
+              data-target="#navbar" aria-expanded="false" aria-controls="navbar">
+        <span class="sr-only">Toggle navigation</span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+      </button>
+      <a class="navbar-brand" href="http://gearpump.io">Gearpump
+        <span class="label label-primary" style="font-size: .6em">{{site.GEARPUMP_VERSION}}</span>
+      </a>
+    </div>
+    <div id="navbar" class="collapse navbar-collapse">
+      <ul class="nav navbar-nav">
+        <li><a href="index.html">Overview</a></li>
 
-          <li class="dropdown">
-            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Introduction<b class="caret"></b></a>
-            <ul class="dropdown-menu">
-              <li><a href="submit-your-1st-application.html">Submit Your 1st Application</a></li>
-              <li><a href="commandline.html">Client Command Line</a></li>
-              <li class="divider"></li>
-              <li><a href="basic-concepts.html">Basic Concepts</a></li>
-              <li><a href="features.html">Technical Highlights</a></li>
-              <li><a href="message-delivery.html">Reliable Message Delivery</a></li>
-              <li><a href="performance-report.html">Performance</a></li>
-              <li><a href="gearpump-internals.html">Gearpump Internals</a></li>
-            </ul>
-          </li>
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Introduction<b
+            class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="submit-your-1st-application.html">Submit Your 1st Application</a></li>
+            <li><a href="commandline.html">Client Command Line</a></li>
+            <li class="divider"></li>
+            <li><a href="basic-concepts.html">Basic Concepts</a></li>
+            <li><a href="features.html">Technical Highlights</a></li>
+            <li><a href="message-delivery.html">Reliable Message Delivery</a></li>
+            <li><a href="performance-report.html">Performance</a></li>
+            <li><a href="gearpump-internals.html">Gearpump Internals</a></li>
+          </ul>
+        </li>
 
-          <li class="dropdown">
-            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
-            <ul class="dropdown-menu">
-              <li class="dropdown-header">Deployment</li>
-              <li><a href="deployment-local.html">Local Mode</a><li>
-              <li><a href="deployment-standalone.html">Standalone Mode</a></li>
-              <li><a href="deployment-yarn.html">YARN Mode</a></li>
-              <li><a href="deployment-docker.html">Docker Mode</a><li>
-              <li class="divider"></li>
-              <li><a href="deployment-ui-authentication.html">UI Authentication</a></li>
-              <li><a href="deployment-ha.html">High Availability</a></li>
-              <li><a href="deployment-msg-delivery.html">Reliable Message Delivery</a></li>
-              <li><a href="deployment-configuration.html">Configuration</a></li>
-              <li><a href="deployment-resource-isolation.html">Resource Isolation</a></li>
-              <li class="divider"></li>
-              <li><a href="deployment-security.html">YARN Security Guide</a></li>
-            </ul>
-          </li>
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li class="dropdown-header">Deployment</li>
+            <li><a href="deployment-local.html">Local Mode</a>
+            <li>
+            <li><a href="deployment-standalone.html">Standalone Mode</a></li>
+            <li><a href="deployment-yarn.html">YARN Mode</a></li>
+            <li><a href="deployment-docker.html">Docker Mode</a>
+            <li>
+            <li class="divider"></li>
+            <li><a href="deployment-ui-authentication.html">UI Authentication</a></li>
+            <li><a href="deployment-ha.html">High Availability</a></li>
+            <li><a href="deployment-msg-delivery.html">Reliable Message Delivery</a></li>
+            <li><a href="deployment-configuration.html">Configuration</a></li>
+            <li><a href="deployment-resource-isolation.html">Resource Isolation</a></li>
+            <li class="divider"></li>
+            <li><a href="deployment-security.html">YARN Security Guide</a></li>
+          </ul>
+        </li>
 
-          <li class="dropdown">
-            <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guide<b class="caret"></b></a>
-            <ul class="dropdown-menu">
-              <li><a href="dev-write-1st-app.html">Write Your 1st App</a></li>
-              <li><a href="dev-custom-serializer.html">Customized Message Passing</a></li>
-              <li class="divider"></li>
-              <li><a href="api/scala/index.html">Scala API</a></li>
-              <li><a href="api/java/index.html">Java API</a></li>
-              <li><a href="dev-rest-api.html">RESTful API</a></li>
-              <li class="divider"></li>
-              <li><a href="dev-connectors.html">Gearpump Connectors</a></li>
-              <li class="divider"></li>
-              <li><a href="dev-storm.html">Storm Compatibility</a></li>
-              <!--
-              <li><a href="dev-samoa.html">Samoa Compatibility</a></li>
-              <li class="divider"></li>
-              <li><a href="dev-iot.html">Gearpump with IoT</a></li>
-              -->
-            </ul>
-          </li>
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guide<b
+            class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="dev-write-1st-app.html">Write Your 1st App</a></li>
+            <li><a href="dev-custom-serializer.html">Customized Message Passing</a></li>
+            <li class="divider"></li>
+            <li><a href="api/scala/index.html">Scala API</a></li>
+            <li><a href="api/java/index.html">Java API</a></li>
+            <li><a href="dev-rest-api.html">RESTful API</a></li>
+            <li class="divider"></li>
+            <li><a href="dev-connectors.html">Gearpump Connectors</a></li>
+            <li class="divider"></li>
+            <li><a href="dev-storm.html">Storm Compatibility</a></li>
+            <!--
+            <li><a href="dev-samoa.html">Samoa Compatibility</a></li>
+            <li class="divider"></li>
+            <li><a href="dev-iot.html">Gearpump with IoT</a></li>
+            -->
+          </ul>
+        </li>
 
-          <li class="dropdown">
-            <a href="#" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
-            <ul class="dropdown-menu">
-              <li><a href="how-to-contribute.html">How to Contribute</a></li>
-              <li><a href="coding-style.html">Coding Style</a></li>
-              <li class="divider"></li>
-              <li><a href="faq.html">FAQ</a><li>
-              <li><a href="about.html">About</a></li>
-            </ul>
-          </li>
-        </ul>
-      </div>
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
+          <ul class="dropdown-menu">
+            <li><a href="how-to-contribute.html">How to Contribute</a></li>
+            <li><a href="coding-style.html">Coding Style</a></li>
+            <li class="divider"></li>
+            <li><a href="faq.html">FAQ</a>
+            <li>
+            <li><a href="about.html">About</a></li>
+          </ul>
+        </li>
+      </ul>
     </div>
   </div>
+</div>
 
-  <div class="container" id="content">
-   {% if page.displayTitle %}
-    <h1 class="title">{{ page.displayTitle }}</h1>
-   {% else %}
-    <h1 class="title">{{ page.title }}</h1>
-   {% endif %}
+<div class="container" id="content">
+  {% if page.displayTitle %}
+  <h1 class="title">{{ page.displayTitle }}</h1>
+  {% else %}
+  <h1 class="title">{{ page.title }}</h1>
+  {% endif %}
 
-   {{ content }}
+  {{ content }}
 
-  </div> <!-- /container -->
+</div>
+<!-- /container -->
 
-  <script src="js/vendor/jquery-2.1.4.min.js"></script>
-  <script src="js/vendor/bootstrap-3.3.5.min.js"></script>
-  <script src="js/vendor/anchor-1.1.1.min.js"></script>
-  <script src="js/main.js"></script>
+<script src="js/vendor/jquery-2.1.4.min.js"></script>
+<script src="js/vendor/bootstrap-3.3.5.min.js"></script>
+<script src="js/vendor/anchor-1.1.1.min.js"></script>
+<script src="js/main.js"></script>
 
-  <!-- MathJax Section -->
-  <script type="text/x-mathjax-config">
+<!-- MathJax Section -->
+<script type="text/x-mathjax-config">
     MathJax.Hub.Config({
       TeX: { equationNumbers: { autoNumber: "AMS" } }
     });
-  </script>
-  <script>
-    // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
-    // We could use "//cdn.mathjax...", but that won't support "file://".
-    (function(d, script) {
-      script = d.createElement('script');
-      script.type = 'text/javascript';
-      script.async = true;
-      script.onload = function(){
-        MathJax.Hub.Config({
-          tex2jax: {
-            inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
-            displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
-            processEscapes: true,
-            skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
-          }
-        });
-      };
-      script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
-        'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
-      d.getElementsByTagName('head')[0].appendChild(script);
-    }(document));
-  </script>
+
+</script>
+<script>
+  // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
+  // We could use "//cdn.mathjax...", but that won't support "file://".
+  (function (d, script) {
+    script = d.createElement('script');
+    script.type = 'text/javascript';
+    script.async = true;
+    script.onload = function () {
+      MathJax.Hub.Config({
+        tex2jax: {
+          inlineMath: [["$", "$"], ["\\\\(", "\\\\)"]],
+          displayMath: [["$$", "$$"], ["\\[", "\\]"]],
+          processEscapes: true,
+          skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
+        }
+      });
+    };
+    script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
+      'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
+    d.getElementsByTagName('head')[0].appendChild(script);
+  }(document));
+</script>
 </body>
 </html>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/build_doc.sh
----------------------------------------------------------------------
diff --git a/docs/build_doc.sh b/docs/build_doc.sh
index c8f691e..72a2079 100755
--- a/docs/build_doc.sh
+++ b/docs/build_doc.sh
@@ -32,7 +32,7 @@ export BUILD_API=$2
 # generate _site documents
 jekyll build
 
-# check html link validality
+# check html link validity
 echo "Checking generated HTMLs..."
 htmlproof _site \
   --disable-external \
@@ -46,7 +46,6 @@ if [ "$BUILD_API" = 1 ]; then
   echo "Running 'sbt clean unidoc'; this may take a few minutes..."
   cd $CURDIR/..
   sbt clean unidoc
-
   echo "Moving back into docs dir."
   cd $CURDIR
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/css/api-javadocs.css
----------------------------------------------------------------------
diff --git a/docs/css/api-javadocs.css b/docs/css/api-javadocs.css
index 832e926..e48ebcf 100644
--- a/docs/css/api-javadocs.css
+++ b/docs/css/api-javadocs.css
@@ -35,8 +35,8 @@
   padding-right: 9px;
   padding-left: 9px;
   -webkit-border-radius: 9px;
-     -moz-border-radius: 9px;
-          border-radius: 9px;
+  -moz-border-radius: 9px;
+  border-radius: 9px;
 }
 
 .developer {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/docs/css/main.css
----------------------------------------------------------------------
diff --git a/docs/css/main.css b/docs/css/main.css
index f380701..781d05b 100644
--- a/docs/css/main.css
+++ b/docs/css/main.css
@@ -42,7 +42,7 @@ h3 {
  * using solution at http://stackoverflow.com/questions/8878033/how-
  * to-make-twitter-bootstrap-menu-dropdown-on-hover-rather-than-click
  **/
-ul.nav li.dropdown:hover ul.dropdown-menu{
+ul.nav li.dropdown:hover ul.dropdown-menu {
   display: block;
 }
 
@@ -84,26 +84,39 @@ ul.nav li.dropdown ul.dropdown-menu li.dropdown-submenu ul.dropdown-menu {
 /**
  * MathJax (embedded latex formulas)
  */
-.MathJax .mo { color: inherit }
-.MathJax .mi { color: inherit }
-.MathJax .mf { color: inherit }
-.MathJax .mh { color: inherit }
+.MathJax .mo {
+  color: inherit
+}
+
+.MathJax .mi {
+  color: inherit
+}
+
+.MathJax .mf {
+  color: inherit
+}
+
+.MathJax .mh {
+  color: inherit
+}
 
 /**
  * AnchorJS (anchor links when hovering over headers)
  */
-a.anchorjs-link:hover { text-decoration: none; }
+a.anchorjs-link:hover {
+  text-decoration: none;
+}
 
 /**
  * Dashboard Look And Feel Adjustments
  */
 * {
-  font-family: roboto,"Helvetica Neue",Helvetica,Arial,sans-serif;
+  font-family: roboto, "Helvetica Neue", Helvetica, Arial, sans-serif;
 }
 
 pre, pre > *,
 code, code > * {
-  font-family: "roboto mono",monaco,consolas,menlo,"Lucida Console",monospace !important;
+  font-family: "roboto mono", monaco, consolas, menlo, "Lucida Console", monospace !important;
   font-size: 12px;
   font-weight: normal !important;
   line-height: 165%;