You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:37 UTC

[27/49] incubator-gearpump git commit: GEARPUMP-11, fix code style

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
index 738bbad..e67731e 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,21 @@
 
 package io.gearpump.services
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.ActorSystem
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.TestUtil
-import akka.http.scaladsl.testkit.{ScalatestRouteTest, RouteTestTimeout}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
-import io.gearpump.services.util.UpickleUtil._
-import scala.concurrent.duration._
 
-import scala.util.Try
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
 
-class AdminServiceSpec  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+class AdminServiceSpec
+  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
 
   override def testConfig: Config = TestUtil.DEFAULT_CONFIG
 
@@ -36,14 +40,14 @@ class AdminServiceSpec  extends FlatSpec with ScalatestRouteTest with Matchers w
 
   it should "shutdown the ActorSystem when receiving terminate" in {
     val route = new AdminService(actorSystem).route
-    implicit val customTimeout = RouteTestTimeout(15 seconds)
-    (Post(s"/terminate") ~> route) ~> check{
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Post(s"/terminate") ~> route) ~> check {
       assert(status.intValue() == 404)
     }
 
-    actorSystem.awaitTermination(20 seconds)
+    Await.result(actorSystem.whenTerminated, 20.seconds)
 
     // terminate should terminate current actor system
-    assert(actorSystem.isTerminated)
+    assert(actorSystem.whenTerminated.isCompleted)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
index 7b98cb6..59da80d 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,13 +18,19 @@
 
 package io.gearpump.services
 
+import scala.concurrent.duration._
+import scala.util.{Success, Try}
+
 import akka.actor.ActorRef
 import akka.http.scaladsl.model.headers.`Cache-Control`
-import akka.http.scaladsl.server.RouteResult
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
 import akka.testkit.TestActor.{AutoPilot, KeepRunning}
 import akka.testkit.{TestKit, TestProbe}
 import com.typesafe.config.{Config, ConfigFactory}
-import akka.http.scaladsl.testkit.RouteTestTimeout
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import org.slf4j.Logger
+import upickle.default.read
+
 import io.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary
 import io.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId}
 import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
@@ -33,13 +39,8 @@ import io.gearpump.cluster.TestUtil
 import io.gearpump.jarstore.JarStoreService
 import io.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
 import io.gearpump.util.LogUtil
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import org.slf4j.Logger
-import upickle.default.read
-import akka.http.scaladsl.testkit.ScalatestRouteTest
+// NOTE: This cannot be removed!!!
 import io.gearpump.services.util.UpickleUtil._
-import scala.concurrent.duration._
-import scala.util.{Success, Try}
 
 class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
   with Matchers with BeforeAndAfterAll {
@@ -47,7 +48,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
   override def testConfig: Config = TestUtil.UI_CONFIG
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
-  def actorRefFactory = system
+  private def actorRefFactory = system
 
   val mockAppMaster = TestProbe()
   val failure = LastFailure(System.currentTimeMillis(), "Some error")
@@ -56,13 +57,13 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
 
   def jarStore: JarStoreService = jarStoreService
 
-  def master = mockMaster.ref
+  private def master = mockMaster.ref
 
-  def appMasterRoute = new AppMasterService(master, jarStore, system).route
+  private def appMasterRoute = new AppMasterService(master, jarStore, system).route
 
   mockAppMaster.setAutoPilot {
     new AutoPilot {
-      def run(sender: ActorRef, msg: Any) = msg match {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
         case AppMasterDataDetailRequest(appId) =>
           sender ! GeneralAppMasterSummary(appId)
           KeepRunning
@@ -78,7 +79,6 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
         case QueryExecutorConfig(0) =>
           sender ! ExecutorConfig(system.settings.config)
           KeepRunning
-
       }
     }
   }
@@ -86,7 +86,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
   val mockMaster = TestProbe()
   mockMaster.setAutoPilot {
     new AutoPilot {
-      def run(sender: ActorRef, msg: Any) = msg match {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
         case ResolveAppId(0) =>
           sender ! ResolveAppIdResult(Success(mockAppMaster.ref))
           KeepRunning
@@ -102,20 +102,19 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
 
   "AppMasterService" should "return a JSON structure for GET request when detail = false" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check{
+    Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check {
       val responseBody = responseAs[String]
       read[AppMasterData](responseBody)
 
-      // check the header, should contains no-cache header.
+      // Checks the header, should contains no-cache header.
       // Cache-Control:no-cache, max-age=0
       val noCache = header[`Cache-Control`].get.value()
       assert(noCache == "no-cache, max-age=0")
     }
 
-    Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check{
+    Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check {
       val responseBody = responseAs[String]
     }
-
   }
 
   "MetricsQueryService" should "return history metrics" in {
@@ -137,7 +136,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
 
   "ConfigQueryService" should "return config for application" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check{
+    (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check {
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(config.isSuccess)
@@ -146,7 +145,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
 
   it should "return config for executor " in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check{
+    (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check {
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(config.isSuccess)
@@ -155,14 +154,13 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
 
   it should "return return executor summary" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check{
+    (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check {
       val responseBody = responseAs[String]
       val executorSummary = read[ExecutorSummary](responseBody)
       assert(executorSummary.id == 0)
     }
   }
 
-
   override def afterAll {
     TestKit.shutdownActorSystem(system)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
index 1cfc01a..a24db18 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,55 +19,55 @@
 package io.gearpump.services
 
 import java.io.File
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Success, Try}
 
 import akka.actor.ActorRef
 import akka.http.scaladsl.marshalling.Marshal
 import akka.http.scaladsl.model._
-import akka.http.scaladsl.model.headers.{`Cache-Control`, `Set-Cookie`}
-import akka.stream.scaladsl.Source
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import akka.stream.scaladsl.{FileIO, Source}
 import akka.testkit.TestActor.{AutoPilot, KeepRunning}
 import akka.testkit.TestProbe
 import com.typesafe.config.{Config, ConfigFactory}
-import io.gearpump.WorkerId
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
 import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication}
 import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList}
 import io.gearpump.cluster.MasterToClient._
 import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.worker.WorkerSummary
-import io.gearpump.services.MasterService.{SubmitApplicationRequest, BuiltinPartitioners}
+import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
 import io.gearpump.jarstore.JarStoreService
-import io.gearpump.streaming.ProcessorDescription
-import io.gearpump.util.{Constants, Graph}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import scala.concurrent.{Future, ExecutionContext}
-import scala.concurrent.duration._
-import scala.util.{Success, Try}
-import akka.stream.scaladsl.FileIO
+import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
+// NOTE: This cannot be removed!!!
 import io.gearpump.services.util.UpickleUtil._
+import io.gearpump.streaming.ProcessorDescription
+import io.gearpump.util.Graph
 
-class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
-  Matchers with BeforeAndAfterAll {
+class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
+  with Matchers with BeforeAndAfterAll {
   import upickle.default.{read, write}
 
   override def testConfig: Config = TestUtil.UI_CONFIG
 
-  def actorRefFactory = system
+  private def actorRefFactory = system
   val workerId = 0
   val mockWorker = TestProbe()
 
   lazy val jarStoreService = JarStoreService.get(system.settings.config)
 
-  def master = mockMaster.ref
+  private def master = mockMaster.ref
 
   def jarStore: JarStoreService = jarStoreService
 
-  def masterRoute = new MasterService(master, jarStore, system).route
+  private def masterRoute = new MasterService(master, jarStore, system).route
 
   mockWorker.setAutoPilot {
     new AutoPilot {
-      def run(sender: ActorRef, msg: Any) = msg match {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
         case GetWorkerData(workerId) =>
           sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId))
           KeepRunning
@@ -81,11 +81,11 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
   val mockMaster = TestProbe()
   mockMaster.setAutoPilot {
     new AutoPilot {
-      def run(sender: ActorRef, msg: Any) = msg match {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
         case GetMasterData =>
           sender ! MasterData(null)
           KeepRunning
-        case  AppMastersDataRequest =>
+        case AppMastersDataRequest =>
           sender ! AppMastersData(List.empty[AppMasterData])
           KeepRunning
         case GetAllWorkers =>
@@ -107,15 +107,14 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
     }
   }
 
-
   it should "return master info when asked" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
     (Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check {
-      // check the type
+      // Checks the type
       val content = responseAs[String]
       read[MasterData](content)
 
-      // check the header, should contains no-cache header.
+      // Checks the header, should contains no-cache header.
       // Cache-Control:no-cache, max-age=0
       val noCache = header[`Cache-Control`].get.value()
       assert(noCache == "no-cache, max-age=0")
@@ -127,7 +126,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
   it should "return a json structure of appMastersData for GET request" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
     (Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check {
-      //check the type
+      // Checks the type
       read[AppMastersData](responseAs[String])
     }
     mockMaster.expectMsg(AppMastersDataRequest)
@@ -136,7 +135,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
   it should "return a json structure of worker data for GET request" in {
     implicit val customTimeout = RouteTestTimeout(25.seconds)
     Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check {
-      //check the type
+      // Checks the type
       val workerListJson = responseAs[String]
       val workers = read[List[WorkerSummary]](workerListJson)
       assert(workers.size > 0)
@@ -151,7 +150,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
 
   it should "return config for master" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check{
+    (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check {
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(config.isSuccess)
@@ -170,7 +169,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
   }
 
   private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
-    val entity =  HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
+    val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
       FileIO.fromFile(file, chunkSize = 100000))
 
     val body = Source.single(
@@ -185,7 +184,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
 
   "MetricsQueryService" should "return history metrics" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute)~> check {
+    (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check {
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(config.isSuccess)
@@ -200,7 +199,8 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
     )
     val dag = Graph(0 ~ "partitioner" ~> 1)
     val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null))
-    Post(s"/api/$REST_VERSION/master/submitdag", HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check {
+    Post(s"/api/$REST_VERSION/master/submitdag",
+      HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check {
       val responseBody = responseAs[String]
       val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody)
       assert(submitApplicationResultValue.appId >= 0, "invalid appid")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
index 10b45b9..3cf99e9 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,23 +18,23 @@
 
 package io.gearpump.services
 
-import akka.http.scaladsl.testkit.{RouteTestTimeout}
-import com.typesafe.config.Config
-import io.gearpump.cluster.TestUtil
-import io.gearpump.services.util.UpickleUtil._
-import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
-import akka.actor.{ActorSystem}
-import akka.http.scaladsl.server._
 import scala.concurrent.duration._
-import akka.http.scaladsl.model._
-import headers._
+
+import akka.actor.ActorSystem
 import akka.http.scaladsl.model.FormData
-import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`}
-import akka.http.scaladsl.server.AuthorizationFailedRejection
+import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`, _}
 import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.testkit.ScalatestRouteTest
+import akka.http.scaladsl.server.{AuthorizationFailedRejection, _}
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 
-class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers with BeforeAndAfterAll {
+import io.gearpump.cluster.TestUtil
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
+
+class SecurityServiceSpec
+  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
 
   override def testConfig: Config = TestUtil.UI_CONFIG
 
@@ -43,9 +43,9 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matcher
   it should "return 401 if not authenticated" in {
     val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
 
-    implicit val customTimeout = RouteTestTimeout(15 seconds)
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
 
-    (Get(s"/resource") ~> security.route) ~> check{
+    (Get(s"/resource") ~> security.route) ~> check {
       assert(rejection.isInstanceOf[AuthenticationFailedRejection])
     }
   }
@@ -53,10 +53,11 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matcher
   "guest" should "get protected resource after authentication" in {
     val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
 
-    implicit val customTimeout = RouteTestTimeout(15 seconds)
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
 
     var cookie: HttpCookiePair = null
-    (Post(s"/login", FormData("username" -> "guest", "password" -> "guest")) ~> security.route) ~> check{
+    (Post(s"/login", FormData("username" -> "guest", "password" -> "guest"))
+      ~> security.route) ~> check {
       assert("{\"user\":\"guest\"}" == responseAs[String])
       assert(status.intValue() == 200)
       assert(header[`Set-Cookie`].isDefined)
@@ -65,18 +66,18 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matcher
       cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value)
     }
 
-    // after authentication, everything is fine.
+    // After authentication, everything is fine.
     Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
       responseAs[String] shouldEqual "OK"
     }
 
-    // however, guest cannot access high-permission operations, like POST.
-      Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
+    // However, guest cannot access high-permission operations, like POST.
+    Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
       assert(rejection == AuthorizationFailedRejection)
     }
 
-    // logout, should clear the session
-    Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{
+    // Logout, should clear the session
+    Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
       assert("{\"user\":\"guest\"}" == responseAs[String])
       assert(status.intValue() == 200)
       assert(header[`Set-Cookie`].isDefined)
@@ -85,12 +86,12 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matcher
       assert(httpCookie.value == "deleted")
     }
 
-    // access again, rejected.
-    Get("/resource") ~>  security.route ~> check {
+    // Access again, rejected this time.
+    Get("/resource") ~> security.route ~> check {
       assert(rejection.isInstanceOf[AuthenticationFailedRejection])
     }
 
-    Post("/resource") ~>  security.route ~> check {
+    Post("/resource") ~> security.route ~> check {
       assert(rejection.isInstanceOf[AuthenticationFailedRejection])
     }
   }
@@ -98,10 +99,11 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matcher
   "admin" should "get protected resource after authentication" in {
     val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
 
-    implicit val customTimeout = RouteTestTimeout(15 seconds)
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
 
     var cookie: HttpCookiePair = null
-    (Post(s"/login", FormData("username" -> "admin", "password" -> "admin")) ~> security.route) ~> check{
+    (Post(s"/login", FormData("username" -> "admin", "password" -> "admin"))
+      ~> security.route) ~> check {
       assert("{\"user\":\"admin\"}" == responseAs[String])
       assert(status.intValue() == 200)
       assert(header[`Set-Cookie`].isDefined)
@@ -110,7 +112,7 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matcher
       cookie = HttpCookiePair(httpCookie.name, httpCookie.value)
     }
 
-    // after authentication, everything is fine.
+    // After authentication, everything is fine.
     Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
       responseAs[String] shouldEqual "OK"
     }
@@ -120,8 +122,8 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matcher
       responseAs[String] shouldEqual "OK"
     }
 
-    // logout, should clear the session
-    Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{
+    // Logout, should clear the session
+    Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
       assert("{\"user\":\"admin\"}" == responseAs[String])
       assert(status.intValue() == 200)
       assert(header[`Set-Cookie`].isDefined)
@@ -130,8 +132,8 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest  with Matcher
       assert(httpCookie.value == "deleted")
     }
 
-    // access again, rejected.
-    Get("/resource") ~>  security.route ~> check {
+    // Access again, rejected this time.
+    Get("/resource") ~> security.route ~> check {
       assert(rejection.isInstanceOf[AuthenticationFailedRejection])
     }
 
@@ -145,7 +147,7 @@ object SecurityServiceSpec {
 
   val resource = new RouteService {
     override def route: Route = {
-      get{
+      get {
         path("resource") {
           complete("OK")
         }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
index 501776e..f61e2f5 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,41 +18,43 @@
 
 package io.gearpump.services
 
+import scala.concurrent.duration._
+import scala.util.Try
+
 import akka.http.scaladsl.model.headers.`Cache-Control`
 import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import akka.testkit.TestProbe
 import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.TestUtil
 import io.gearpump.util.Constants
+// NOTE: This cannot be removed!!!
 import io.gearpump.services.util.UpickleUtil._
-import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
 
-import scala.util.Try
-import scala.concurrent.duration._
-
-class StaticServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers with BeforeAndAfterAll {
+class StaticServiceSpec
+  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
 
   override def testConfig: Config = TestUtil.UI_CONFIG
-  private val supervisorPath = system.settings.config.getString(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
-
+  private val supervisorPath = system.settings.config.getString(
+    Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
 
-  def route = new StaticService(system, supervisorPath).route
+  protected def route = new StaticService(system, supervisorPath).route
 
   it should "return version" in {
-    implicit val customTimeout = RouteTestTimeout(15 seconds)
-    (Get(s"/version") ~> route) ~> check{
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/version") ~> route) ~> check {
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(responseBody == "Unknown-Version")
 
-      // by default, it will be cached.
+      // By default, it will be cached.
       assert(header[`Cache-Control`].isEmpty)
     }
   }
 
   it should "get correct supervisor path" in {
-    implicit val customTimeout = RouteTestTimeout(15 seconds)
-    (Get(s"/supervisor-actor-path") ~> route) ~> check{
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/supervisor-actor-path") ~> route) ~> check {
       val responseBody = responseAs[String]
       val defaultSupervisorPath = ""
       assert(responseBody == defaultSupervisorPath)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
index dc8d5a7..2676a16 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,42 +18,44 @@
 
 package io.gearpump.services
 
+import scala.concurrent.duration._
+import scala.util.{Success, Try}
+
 import akka.actor.ActorRef
 import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
 import akka.testkit.TestActor.{AutoPilot, KeepRunning}
 import akka.testkit.{TestKit, TestProbe}
 import com.typesafe.config.{Config, ConfigFactory}
-import io.gearpump.WorkerId
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
 import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId}
 import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
 import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.worker.WorkerSummary
+import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
 import io.gearpump.jarstore.JarStoreService
+// NOTE: This cannot be removed!!!
 import io.gearpump.services.util.UpickleUtil._
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import scala.concurrent.duration._
-import scala.util.{Success, Try}
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
 
-class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers with BeforeAndAfterAll {
+class WorkerServiceSpec
+  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
 
   override def testConfig: Config = TestUtil.DEFAULT_CONFIG
 
-  def actorRefFactory = system
+  protected def actorRefFactory = system
 
   val mockWorker = TestProbe()
 
-  def master = mockMaster.ref
+  protected def master = mockMaster.ref
 
   lazy val jarStoreService = JarStoreService.get(system.settings.config)
 
-  def workerRoute = new WorkerService(master, system).route
+  protected def workerRoute = new WorkerService(master, system).route
 
   mockWorker.setAutoPilot {
     new AutoPilot {
-      def run(sender: ActorRef, msg: Any) = msg match {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
         case GetWorkerData(workerId) =>
           sender ! WorkerData(WorkerSummary.empty)
           KeepRunning
@@ -70,7 +72,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers
   val mockMaster = TestProbe()
   mockMaster.setAutoPilot {
     new AutoPilot {
-      def run(sender: ActorRef, msg: Any) = msg match {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
         case ResolveWorkerId(workerId) =>
           sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
           KeepRunning
@@ -78,10 +80,10 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers
     }
   }
 
-
   "ConfigQueryService" should "return config for worker" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config") ~> workerRoute) ~> check{
+    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config")
+      ~> workerRoute) ~> check {
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(config.isSuccess)
@@ -90,12 +92,13 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers
 
   it should "return WorkerData" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}") ~> workerRoute) ~> check{
+    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}")
+      ~> workerRoute) ~> check {
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(config.isSuccess)
 
-      // check the header, should contains no-cache header.
+      // Check the header, should contains no-cache header.
       // Cache-Control:no-cache, max-age=0
       val noCache = header[`Cache-Control`].get.value()
       assert(noCache == "no-cache, max-age=0")
@@ -104,7 +107,8 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest  with Matchers
 
   "MetricsQueryService" should "return history metrics" in {
     implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker") ~> workerRoute) ~> check {
+    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker")
+      ~> workerRoute) ~> check {
       val responseBody = responseAs[String]
       val config = Try(ConfigFactory.parseString(responseBody))
       assert(config.isSuccess)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
index 68a3506..136026a 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,23 @@
 
 package io.gearpump.services.security.oauth2
 
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.HttpEntity.Strict
 import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.{Query, Path}
+import akka.http.scaladsl.model.Uri.{Path, Query}
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.testkit.ScalatestRouteTest
 import com.typesafe.config.ConfigFactory
-import io.gearpump.security.Authenticator
-import io.gearpump.services.security.oauth2.impl.{CloudFoundryUAAOAuth2Authenticator, GoogleOAuth2Authenticator}
 import org.scalatest.FlatSpec
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.collection.JavaConverters._
+
+import io.gearpump.security.Authenticator
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
+import io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator
 
 class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
 
@@ -71,8 +75,9 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
     val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
     val mail = "test@gearpump.io"
 
-    def accessTokenEndpoint(request: HttpRequest) = {
-      assert(request.getHeader("Authorization").get.value() == "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=")
+    def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
+      assert(request.getHeader("Authorization").get.value() ==
+        "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=")
       assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
 
       val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
@@ -85,27 +90,27 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
 
       val response =
         s"""
-          |{
-          |  "access_token": "$accessToken",
-          |  "token_type": "bearer",
-          |  "refresh_token": "$refreshToken",
-          |  "expires_in": 43199,
-          |  "scope": "openid",
-          |  "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9"
-          |}
+        |{
+        |  "access_token": "$accessToken",
+        |  "token_type": "bearer",
+        |  "refresh_token": "$refreshToken",
+        |  "expires_in": 43199,
+        |  "scope": "openid",
+        |  "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9"
+        |}
         """.stripMargin
       HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
     }
 
-    def protectedResourceEndpoint(request: HttpRequest) = {
+    def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
       assert(request.getUri().query().get("access_token").get == accessToken)
       val response =
         s"""
-          |{
-          |    "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c",
-          |    "user_name": "user",
-          |    "email": "$mail"
-          |}
+        |{
+        |    "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c",
+        |    "user_name": "user",
+        |    "email": "$mail"
+        |}
         """.stripMargin
       HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
     }
@@ -121,7 +126,7 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
     }
 
     val userFuture = uaa.authenticate(code)
-    val user = Await.result(userFuture, 30 seconds)
+    val user = Await.result(userFuture, 30.seconds)
     assert(user.user == mail)
     assert(user.permissionLevel == Authenticator.User.permissionLevel)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
index 58b4a34..70d8bb0 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,21 +18,24 @@
 
 package io.gearpump.services.security.oauth2
 
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.HttpEntity.Strict
 import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.{Query, Path}
+import akka.http.scaladsl.model.Uri.{Path, Query}
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.testkit.ScalatestRouteTest
 import com.typesafe.config.ConfigFactory
-import io.gearpump.security.Authenticator
-import io.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator
-import io.gearpump.services.security.oauth2.impl.{GoogleOAuth2Authenticator, CloudFoundryUAAOAuth2Authenticator}
 import org.scalatest.FlatSpec
 
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration._
+import io.gearpump.security.Authenticator
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
+import io.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator
+import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator
 
 class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
 
@@ -71,7 +74,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
     val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
     val mail = "test@gearpump.io"
 
-    def accessTokenEndpoint(request: HttpRequest) = {
+    def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
 
       assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
 
@@ -85,33 +88,37 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
       assert(form("redirect_uri") == configMap("callback"))
       assert(form("scope") == GoogleOAuth2Authenticator.Scope)
 
-      val response = s"""
-           |{
-           | "access_token": "$accessToken",
-           | "token_type": "Bearer",
-           | "expires_in": 3591,
-           | "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3"
-           |}
+      // scalastyle:off line.size.limit
+      val response =
+        s"""
+        |{
+        | "access_token": "$accessToken",
+        | "token_type": "Bearer",
+        | "expires_in": 3591,
+        | "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3"
+        |}
         """.stripMargin
+      // scalastyle:on line.size.limit
 
       HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
     }
 
-    def protectedResourceEndpoint(request: HttpRequest) = {
+    def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
       assert(request.getUri().query().get("access_token").get == accessToken)
-      val response =s"""
-           |{
-           |   "kind": "plus#person",
-           |   "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8",
-           |   "nickname": "gearpump",
-           |   "gender": "female",
-           |   "emails": [
-           |     {
-           |       "value": "$mail",
-           |       "type": "account"
-           |     }
-           |   ]
-           | }
+      val response =
+        s"""
+        |{
+        |   "kind": "plus#person",
+        |   "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8",
+        |   "nickname": "gearpump",
+        |   "gender": "female",
+        |   "emails": [
+        |     {
+        |       "value": "$mail",
+        |       "type": "account"
+        |     }
+        |   ]
+        | }
         """.stripMargin
       HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
     }
@@ -127,7 +134,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
     }
 
     val userFuture = google.authenticate(code)
-    val user = Await.result(userFuture, 30 seconds)
+    val user = Await.result(userFuture, 30.seconds)
     assert(user.user == mail)
     assert(user.permissionLevel == Authenticator.Guest.permissionLevel)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
index 9b6b5e9..c03532d 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +18,18 @@
 
 package io.gearpump.services.security.oauth2
 
-import akka.actor.{ActorRef, ActorSystem}
+import scala.concurrent.{Await, Future}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
 import akka.http.scaladsl.Http.ServerBinding
-import akka.http.scaladsl.model.{HttpResponse, HttpRequest}
-import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
 import akka.stream.ActorMaterializer
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.server.Directives._
-import akka.stream.scaladsl.{Sink, Source}
-import io.gearpump.util.Util
-import akka.pattern.ask
+import akka.stream.scaladsl.Sink
 
-import scala.concurrent.{Await, Future}
+import io.gearpump.util.Util
+// NOTE: This cannot be removed!!
+import io.gearpump.services.util.UpickleUtil._
 
 /**
  * Serves as a fake OAuth2 server.
@@ -48,12 +48,11 @@ class MockOAuth2Server(
   def port: Int = _port
 
   def start(): Unit = {
-    _port = Util.findFreePort.get
+    _port = Util.findFreePort().get
 
     val serverSource = Http().bind(interface = "127.0.0.1", port = _port)
     bindingFuture = {
       serverSource.to(Sink.foreach { connection =>
-        println("Accepted new connection from " + connection.remoteAddress)
         connection handleWithSyncHandler requestHandler
       }).run()
     }
@@ -61,6 +60,6 @@ class MockOAuth2Server(
 
   def stop(): Unit = {
     import scala.concurrent.duration._
-    Await.result(bindingFuture.map(_.unbind()), 120 seconds)
+    Await.result(bindingFuture.map(_.unbind()), 120.seconds)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
index 9b3069e..b074813 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
 
 package io.gearpump.services.util
 
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import upickle.default.{read, write}
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.metrics.Metrics.{Counter, MetricType}
 import io.gearpump.services.util.UpickleUtil._
 import io.gearpump.streaming.ProcessorId
 import io.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary}
 import io.gearpump.util.Graph
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-import upickle.default.{read, write}
 
 class UpickleSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/README.md
----------------------------------------------------------------------
diff --git a/streaming/README.md b/streaming/README.md
deleted file mode 100644
index 86d4ec2..0000000
--- a/streaming/README.md
+++ /dev/null
@@ -1,116 +0,0 @@
-A very basic DSL which support flatmap, reduce, and etc..
-======================
-
-
-Supported operators:
-------------------
-```scala
-class Stream[T](dag: Graph[Op, OpEdge], private val thisNode: Op, private val edge: Option[OpEdge] = None) {
-
-  /**
-   * convert a value[T] to a list of value[R]
-   * @param fun
-   * @tparam R
-   * @return
-   */
-  def flatMap[R](fun: T => TraversableOnce[R]): Stream[R]
-
-  /**
-   * convert value[T] to value[R]
-   * @param fun
-   * @tparam R
-   * @return
-   */
-  def map[R](fun: T => R): Stream[R]
-
-  /**
-   * reserve records when fun(T) == true
-   * @param fun
-   * @return
-   */
-  def filter(fun: T => Boolean): Stream[T]
-
-  /**
-   * Reduce opeartion
-   * @param fun
-   * @return
-   */
-  def reduce(fun: (T, T) => T): Stream[T]
-
-  /**
-   * Log to task log file
-   */
-  def log(): Unit
-
-  /**
-   * Merge data from two stream into one
-   * @param other
-   * @return
-   */
-  def merge(other: Stream[T]): Stream[T]
-
-  /**
-   * Group by fun(T)
-   *
-   * For example, we have T type, People(name: String, gender: String, age: Int)
-   * groupBy[People](_.gender) will group the people by gender.
-   *
-   * You can append other combinators after groupBy
-   *
-   * For example,
-   *
-   * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
-   *
-   * @param fun
-   * @param parallism
-   * @tparam Group
-   * @return
-   */
-  def groupBy[Group](fun: T => Group, parallism: Int = 1): Stream[T]
-
-  /**
-   * connect with a low level Processor(TaskDescription)
-   * @param processor
-   * @param parallism
-   * @tparam R
-   * @return
-   */
-  def process[R](processor: Class[_ <: Task], parallism: Int): Stream[R]
-}
-
-```
-
-How to define the DSL
----------------
-WordCount:
-
-```scala
-val context = ClientContext(master)
-    val app = StreamApp("dsl", context)
-
-    val data = "This is a good start, bingo!! bingo!!"
-    app.fromCollection(data.lines.toList).
-      // word => (word, count)
-      flatMap(line => line.split("[\\s]+")).map((_, 1)).
-      // (word, count1), (word, count2) => (word, count1 + count2)
-      groupBy(kv => kv._1).reduce((left, right) => (left._1, left._2 + right._2))
-
-    val appId = context.submit(app)
-    context.close()
-```
-
-For the full example, please check https://github.com/intel-hadoop/gearpump/tree/master/experiments/dsl/src/main/scala/io.gearpump/streaming/dsl/example
-
-
-Run an example
----------------------
-```bash
-# start master
-bin\local
-
-# start UI
-bin\services
-
-# start example topology
-bin\gear io.gearpump.streaming.dsl.example.WordCount
-```
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
index 8b81253..dbae1c4 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,8 +19,15 @@
 package io.gearpump.streaming.javaapi;
 
 import io.gearpump.partitioner.Partitioner;
+import io.gearpump.streaming.Processor;
+import io.gearpump.streaming.task.Task;
 
-public class Graph extends io.gearpump.util.Graph<io.gearpump.streaming.Processor<? extends io.gearpump.streaming.task.Task>, Partitioner> {
+/**
+ * Java version of Graph
+ *
+ * See {@link io.gearpump.util.Graph}
+ */
+public class Graph extends io.gearpump.util.Graph<Processor<? extends Task>, Partitioner> {
 
   public Graph() {
     super(null, null);

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
index 52b5d61..974183e 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,6 +27,11 @@ import io.gearpump.streaming.source.DataSource;
 import io.gearpump.streaming.source.DataSourceProcessor;
 import io.gearpump.streaming.source.DataSourceTask;
 
+/**
+ * Java version of Processor
+ *
+ * See {@link io.gearpump.streaming.Processor}
+ */
 public class Processor<T extends io.gearpump.streaming.task.Task> implements io.gearpump.streaming.Processor<T> {
   private Class<T> _taskClass;
   private int _parallelism = 1;
@@ -43,46 +48,49 @@ public class Processor<T extends io.gearpump.streaming.task.Task> implements io.
   }
 
   /**
-   * Create a Sink Processor
-   * @param dataSink  the data sink itself
-   * @param parallelism  the parallelism of this processor
-   * @param description  the description for this processor
-   * @param taskConf     the configuration for this processor
-   * @param system       actor system
-   * @return   the new created sink processor
+   * Creates a Sink Processor
+   *
+   * @param dataSink    the data sink itself
+   * @param parallelism the parallelism of this processor
+   * @param description the description for this processor
+   * @param taskConf    the configuration for this processor
+   * @param system      actor system
+   * @return the new created sink processor
    */
-  public static Processor<DataSinkTask> sink(DataSink dataSink, int parallelism, String description,  UserConfig taskConf, ActorSystem system) {
+  public static Processor<DataSinkTask> sink(DataSink dataSink, int parallelism, String description, UserConfig taskConf, ActorSystem system) {
     io.gearpump.streaming.Processor<DataSinkTask> p = DataSinkProcessor.apply(dataSink, parallelism, description, taskConf, system);
     return new Processor(p);
   }
 
   /**
-   * Create a Source Processor
-   * @param source    the data source itself
-   * @param parallelism   the parallelism of this processor
-   * @param description   the description of this processor
-   * @param taskConf      the configuration of this processor
-   * @param system        actor system
-   * @return              the new created source processor
+   * Creates a Source Processor
+   *
+   * @param source      the data source itself
+   * @param parallelism the parallelism of this processor
+   * @param description the description of this processor
+   * @param taskConf    the configuration of this processor
+   * @param system      actor system
+   * @return the new created source processor
    */
-  public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description,  UserConfig taskConf, ActorSystem system) {
+  public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) {
     io.gearpump.streaming.Processor<DataSourceTask> p = DataSourceProcessor.apply(source, parallelism, description, taskConf, system);
     return new Processor(p);
   }
 
   public Processor(io.gearpump.streaming.Processor<T> processor) {
-    this._taskClass = (Class)(processor.taskClass());
+    this._taskClass = (Class) (processor.taskClass());
     this._parallelism = processor.parallelism();
     this._description = processor.description();
     this._userConf = processor.taskConf();
   }
 
   /**
-   *  Create a general processor with user specified task logic.
-   * @param taskClass  task implementation class of this processor (shall be a derived class from {@link Task}
+   * Creates a general processor with user specified task logic.
+   *
+   * @param taskClass    task implementation class of this processor (shall be a derived class from {@link Task}
    * @param parallelism, how many initial tasks you want to use
    * @param description, some text to describe this processor
-   * @param taskConf, Processor specific configuration
+   * @param taskConf,    Processor specific configuration
    */
   public Processor(Class<T> taskClass, int parallelism, String description, UserConfig taskConf) {
     this._taskClass = taskClass;

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
index d017793..150a26f 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.streaming.javaapi;
 
 import akka.actor.ActorSystem;
@@ -24,21 +23,25 @@ import io.gearpump.cluster.Application;
 import io.gearpump.cluster.ApplicationMaster;
 import io.gearpump.cluster.UserConfig;
 
-
+/**
+ * Java version of StreamApplication.
+ *
+ * Also see {@link io.gearpump.streaming.StreamApplication}
+ */
 public class StreamApplication implements Application {
   private io.gearpump.streaming.StreamApplication app;
   /**
-   * Create a streaming application
-   * @param name name of the application
-   * @param conf  user configuration
-   * @param graph  the DAG
+   * Creates a streaming application
    *
+   * @param name  Name of the application
+   * @param conf  User configuration
+   * @param graph The DAG
    */
   public StreamApplication(String name, UserConfig conf, Graph graph) {
     //by pass the tricky type check in scala 2.10
     io.gearpump.util.Graph untypedGraph = graph;
     this.app = io.gearpump.streaming.StreamApplication.apply(
-        name, untypedGraph, conf);
+      name, untypedGraph, conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
index 836287d..45fae19 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,6 +24,11 @@ import io.gearpump.cluster.UserConfig;
 import io.gearpump.streaming.task.StartTime;
 import io.gearpump.streaming.task.TaskContext;
 
+/**
+ * Java version of Task.
+ *
+ * See {@link io.gearpump.streaming.task.Task}
+ */
 public class Task extends io.gearpump.streaming.task.Task {
   protected TaskContext context;
   protected UserConfig userConf;
@@ -40,11 +45,14 @@ public class Task extends io.gearpump.streaming.task.Task {
   }
 
   @Override
-  public void onStart(StartTime startTime){}
+  public void onStart(StartTime startTime) {
+  }
 
   @Override
-  public void onNext(Message msg){}
+  public void onNext(Message msg) {
+  }
 
   @Override
-  public void onStop(){}
+  public void onStop() {
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
index e4e137f..bb97442 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,9 @@ package io.gearpump.streaming.javaapi.dsl.functions;
 import java.io.Serializable;
 
 /**
- * a function that decides whether to reserve a value<T>
+ * Filter function
+ *
+ * @param <T> Message of type T
  */
 public interface FilterFunction<T> extends Serializable {
   boolean apply(T t);

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
index b65a338..3e18cf1 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,7 +22,10 @@ import java.io.Serializable;
 import java.util.Iterator;
 
 /**
- * a function that converts a value<T> to a iterator of value<R>
+ * Function that converts a value of type T to a iterator of values of type R.
+ *
+ * @param <T> Input value type
+ * @param <R> Return value type
  */
 public interface FlatMapFunction<T, R> extends Serializable {
   Iterator<R> apply(T t);

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
index 651c477..2ba524e 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,10 @@ package io.gearpump.streaming.javaapi.dsl.functions;
 import java.io.Serializable;
 
 /**
- * a function that puts a value<T> into a Group
+ * GroupBy function which assign value of type T to groups
+ *
+ * @param <T> Input value type
+ * @param <Group> Group Type
  */
 public interface GroupByFunction<T, Group> extends Serializable {
   Group apply(T t);

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
index a30a671..b4bd6ac 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,10 @@ package io.gearpump.streaming.javaapi.dsl.functions;
 import java.io.Serializable;
 
 /**
- * a function that converts a value<T> to value<R>
+ * Function that map a value of type T to value of type R
+ *
+ * @param <T> Input value type
+ * @param <R> Output value type
  */
 public interface MapFunction<T, R> extends Serializable {
   R apply(T t);

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
index 0f4bb18..f439c0a 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,9 @@ package io.gearpump.streaming.javaapi.dsl.functions;
 import java.io.Serializable;
 
 /**
- * a function that applies reduce operation
+ * Function that applies reduce operation
+ *
+ * @param <T> Input value type
  */
 public interface ReduceFunction<T> extends Serializable {
   T apply(T t1, T t2);

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
index 370b36c..06cf022 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,29 +18,34 @@
 
 package io.gearpump.streaming
 
+import scala.language.existentials
+
 import akka.actor.ActorRef
-import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
-import io.gearpump.streaming.task.{TaskId, Subscriber}
+
 import io.gearpump.TimeStamp
 import io.gearpump.cluster.appmaster.WorkerInfo
 import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
+import io.gearpump.streaming.task.{Subscriber, TaskId}
 import io.gearpump.transport.HostPort
 
-import scala.language.existentials
-
 object AppMasterToExecutor {
-  case class LaunchTasks(taskId: List[TaskId], dagVersion: Int, processorDescription: ProcessorDescription, subscribers: List[Subscriber])
+  case class LaunchTasks(
+      taskId: List[TaskId], dagVersion: Int, processorDescription: ProcessorDescription,
+      subscribers: List[Subscriber])
 
   case object TasksLaunched
 
   /**
    * dagVersion, life, and subscribers will be changed on target task list.
    */
-  case class ChangeTasks(taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
+  case class ChangeTasks(
+      taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
 
   case class TasksChanged(taskIds: List[TaskId])
 
-  case class ChangeTask(taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
+  case class ChangeTask(
+      taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
 
   case class TaskChanged(taskId: TaskId, dagVersion: Int)
 
@@ -52,7 +57,8 @@ object AppMasterToExecutor {
 
   case class TaskLocationsReceived(dagVersion: Int, executorId: ExecutorId)
 
-  case class TaskLocationsRejected(dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable)
+  case class TaskLocationsRejected(
+      dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable)
 
   case class StartAllTasks(dagVersion: Int)
 
@@ -65,10 +71,11 @@ object AppMasterToExecutor {
 }
 
 object ExecutorToAppMaster {
-  case class RegisterExecutor(executor: ActorRef, executorId: Int, resource: Resource, worker : WorkerInfo)
+  case class RegisterExecutor(
+      executor: ActorRef, executorId: Int, resource: Resource, worker : WorkerInfo)
 
-  case class RegisterTask(taskId: TaskId, executorId : Int, task: HostPort)
-  case class UnRegisterTask(taskId: TaskId, executorId : Int)
+  case class RegisterTask(taskId: TaskId, executorId: Int, task: HostPort)
+  case class UnRegisterTask(taskId: TaskId, executorId: Int)
 
   case class MessageLoss(executorId: Int, taskId: TaskId, cause: String)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/Constants.scala b/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
index fa23807..becf31a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package io.gearpump.streaming
 
 object Constants {
@@ -11,6 +29,9 @@ object Constants {
 
   val GEARPUMP_STREAMING_REGISTER_TASK_TIMEOUT_MS = "gearpump.streaming.register-task-timeout-ms"
 
-  val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT = "gearpump.streaming.max-pending-message-count-per-connection"
-  val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT = "gearpump.streaming.ack-once-every-message-count"
+  val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT =
+    "gearpump.streaming.max-pending-message-count-per-connection"
+
+  val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT =
+    "gearpump.streaming.ack-once-every-message-count"
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/DAG.scala b/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
index a6b8cba..a73fd48 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,16 @@
 
 package io.gearpump.streaming
 
-import io.gearpump.streaming.task.TaskId
 import io.gearpump.partitioner.PartitionerDescription
+import io.gearpump.streaming.task.TaskId
 import io.gearpump.util.Graph
 
 /**
- * DAG is wrapper for [[Graph]] for streaming applications.
+ * DAG is wrapper for [[io.gearpump.util.Graph]] for streaming applications.
  */
-case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription], graph : Graph[ProcessorId, PartitionerDescription]) extends Serializable {
+case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription],
+    graph : Graph[ProcessorId, PartitionerDescription])
+  extends Serializable {
 
   def isEmpty: Boolean = {
     processors.isEmpty
@@ -46,15 +48,15 @@ case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription]
 }
 
 object DAG {
-  def apply (graph : Graph[ProcessorDescription, PartitionerDescription], version: Int = 0) : DAG = {
-    val processors = graph.vertices.map{processorDescription =>
+  def apply(graph: Graph[ProcessorDescription, PartitionerDescription], version: Int = 0): DAG = {
+    val processors = graph.vertices.map { processorDescription =>
       (processorDescription.id, processorDescription)
     }.toMap
-    val dag = graph.mapVertex{ processor =>
+    val dag = graph.mapVertex { processor =>
       processor.id
     }
     new DAG(version, processors, dag)
   }
 
-  def empty() = apply(Graph.empty[ProcessorDescription, PartitionerDescription])
+  def empty: DAG = apply(Graph.empty[ProcessorDescription, PartitionerDescription])
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
index 4eabcd2..8eb866d 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -75,7 +75,7 @@ class InitialAckRequestSerializer extends TaskMessageSerializer[InitialAckReques
   }
 }
 
-class AckRequestSerializer extends TaskMessageSerializer[AckRequest]{
+class AckRequestSerializer extends TaskMessageSerializer[AckRequest] {
   val taskIdSerializer = new TaskIdSerializer
 
   override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 6

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
index b68054a..0ca4d92 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,20 @@
 
 package io.gearpump.streaming
 
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
 import akka.actor.ActorSystem
-import io.gearpump.streaming.appmaster.AppMaster
-import io.gearpump.streaming.task.Task
+
 import io.gearpump.TimeStamp
 import io.gearpump.cluster._
 import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject}
-import io.gearpump.util.{LogUtil, Graph, ReferenceEqual}
-
-import scala.language.implicitConversions
-import scala.reflect.ClassTag
+import io.gearpump.streaming.appmaster.AppMaster
+import io.gearpump.streaming.task.Task
+import io.gearpump.util.{Graph, LogUtil, ReferenceEqual}
 
 /**
  * Processor is the blueprint for tasks.
- *
  */
 trait Processor[+T <: Task] extends ReferenceEqual {
 
@@ -41,7 +41,7 @@ trait Processor[+T <: Task] extends ReferenceEqual {
   def parallelism: Int
 
   /**
-   * The custom [[UserConfig]], it is used to initialize a task in runtime.
+   * The custom [[io.gearpump.cluster.UserConfig]], it is used to initialize a task in runtime.
    */
   def taskConf: UserConfig
 
@@ -59,20 +59,29 @@ trait Processor[+T <: Task] extends ReferenceEqual {
 }
 
 object Processor {
-  def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ <: Task]): ProcessorDescription = {
+  def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ <: Task])
+    : ProcessorDescription = {
     import processor._
     ProcessorDescription(id, taskClass.getName, parallelism, description, taskConf)
   }
 
-  def apply[T<: Task](parallelism : Int, description: String = "", taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T]): DefaultProcessor[T] = {
-    new DefaultProcessor[T](parallelism, description, taskConf, classtag.runtimeClass.asInstanceOf[Class[T]])
+  def apply[T<: Task](
+      parallelism : Int, description: String = "",
+      taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T])
+    : DefaultProcessor[T] = {
+    new DefaultProcessor[T](parallelism, description, taskConf,
+      classtag.runtimeClass.asInstanceOf[Class[T]])
   }
 
-  def apply[T<: Task](taskClazz: Class[T], parallelism : Int, description: String, taskConf: UserConfig): DefaultProcessor[T] = {
+  def apply[T<: Task](
+      taskClazz: Class[T], parallelism : Int, description: String, taskConf: UserConfig)
+    : DefaultProcessor[T] = {
     new DefaultProcessor[T](parallelism, description, taskConf, taskClazz)
   }
 
-  case class DefaultProcessor[T<: Task](parallelism : Int, description: String, taskConf: UserConfig, taskClass: Class[T]) extends Processor[T] {
+  case class DefaultProcessor[T<: Task](
+      parallelism : Int, description: String, taskConf: UserConfig, taskClass: Class[T])
+    extends Processor[T] {
 
     def withParallelism(parallel: Int): DefaultProcessor[T] = {
       new DefaultProcessor[T](parallel, description, taskConf, taskClass)
@@ -93,7 +102,6 @@ object Processor {
  *
  * When input message's timestamp is beyond current processor's lifetime,
  * then it will not be processed by this processor.
- *
  */
 case class LifeTime(birth: TimeStamp, death: TimeStamp) {
   def contains(timestamp: TimeStamp): Boolean = {
@@ -112,7 +120,9 @@ object LifeTime {
 /**
  * Represent a streaming application
  */
-class StreamApplication(override val name : String,  val inputUserConfig: UserConfig, val dag: Graph[ProcessorDescription, PartitionerDescription])
+class StreamApplication(
+    override val name: String, val inputUserConfig: UserConfig,
+    val dag: Graph[ProcessorDescription, PartitionerDescription])
   extends Application {
 
   require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")
@@ -137,19 +147,21 @@ object StreamApplication {
   private val hashPartitioner = new HashPartitioner()
   private val LOG = LogUtil.getLogger(getClass)
 
-  def apply[T <: Processor[Task], P <: Partitioner] (name : String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = {
-    import Processor._
+  def apply[T <: Processor[Task], P <: Partitioner](
+      name: String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = {
+    import io.gearpump.streaming.Processor._
 
     if (dag.hasCycle()) {
       LOG.warn(s"Detected cycles in DAG of application $name!")
     }
 
     val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap
-    val graph = dag.mapVertex {processor =>
+    val graph = dag.mapVertex { processor =>
       val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
       updatedProcessor
     }.mapEdge { (node1, edge, node2) =>
-      PartitionerDescription(new PartitionerObject(Option(edge).getOrElse(StreamApplication.hashPartitioner)))
+      PartitionerDescription(new PartitionerObject(
+        Option(edge).getOrElse(StreamApplication.hashPartitioner)))
     }
     new StreamApplication(name, userConfig, graph)
   }