You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:23 UTC

[13/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
deleted file mode 100644
index 59da80d..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.duration._
-import scala.util.{Success, Try}
-
-import akka.actor.ActorRef
-import akka.http.scaladsl.model.headers.`Cache-Control`
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import akka.testkit.TestActor.{AutoPilot, KeepRunning}
-import akka.testkit.{TestKit, TestProbe}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import org.slf4j.Logger
-import upickle.default.read
-
-import io.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary
-import io.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId}
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
-import io.gearpump.cluster.MasterToClient._
-import io.gearpump.cluster.TestUtil
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
-import io.gearpump.util.LogUtil
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
-  with Matchers with BeforeAndAfterAll {
-
-  override def testConfig: Config = TestUtil.UI_CONFIG
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-  private def actorRefFactory = system
-
-  val mockAppMaster = TestProbe()
-  val failure = LastFailure(System.currentTimeMillis(), "Some error")
-
-  lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
-  def jarStore: JarStoreService = jarStoreService
-
-  private def master = mockMaster.ref
-
-  private def appMasterRoute = new AppMasterService(master, jarStore, system).route
-
-  mockAppMaster.setAutoPilot {
-    new AutoPilot {
-      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
-        case AppMasterDataDetailRequest(appId) =>
-          sender ! GeneralAppMasterSummary(appId)
-          KeepRunning
-        case QueryHistoryMetrics(path, _, _, _) =>
-          sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
-          KeepRunning
-        case GetLastFailure(appId) =>
-          sender ! failure
-          KeepRunning
-        case GetExecutorSummary(0) =>
-          sender ! ExecutorSummary.empty
-          KeepRunning
-        case QueryExecutorConfig(0) =>
-          sender ! ExecutorConfig(system.settings.config)
-          KeepRunning
-      }
-    }
-  }
-
-  val mockMaster = TestProbe()
-  mockMaster.setAutoPilot {
-    new AutoPilot {
-      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
-        case ResolveAppId(0) =>
-          sender ! ResolveAppIdResult(Success(mockAppMaster.ref))
-          KeepRunning
-        case AppMasterDataRequest(appId, _) =>
-          sender ! AppMasterData("active")
-          KeepRunning
-        case QueryAppMasterConfig(appId) =>
-          sender ! AppMasterConfig(null)
-          KeepRunning
-      }
-    }
-  }
-
-  "AppMasterService" should "return a JSON structure for GET request when detail = false" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check {
-      val responseBody = responseAs[String]
-      read[AppMasterData](responseBody)
-
-      // Checks the header, should contains no-cache header.
-      // Cache-Control:no-cache, max-age=0
-      val noCache = header[`Cache-Control`].get.value()
-      assert(noCache == "no-cache, max-age=0")
-    }
-
-    Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check {
-      val responseBody = responseAs[String]
-    }
-  }
-
-  "MetricsQueryService" should "return history metrics" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/appmaster/0/metrics/processor") ~> appMasterRoute) ~> check {
-      val responseBody = responseAs[String]
-      val config = Try(ConfigFactory.parseString(responseBody))
-      assert(config.isSuccess)
-    }
-  }
-
-  "AppMaster" should "return lastest error" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/appmaster/0/errors") ~> appMasterRoute) ~> check {
-      val responseBody = responseAs[String]
-      assert(read[LastFailure](responseBody) == failure)
-    }
-  }
-
-  "ConfigQueryService" should "return config for application" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check {
-      val responseBody = responseAs[String]
-      val config = Try(ConfigFactory.parseString(responseBody))
-      assert(config.isSuccess)
-    }
-  }
-
-  it should "return config for executor " in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check {
-      val responseBody = responseAs[String]
-      val config = Try(ConfigFactory.parseString(responseBody))
-      assert(config.isSuccess)
-    }
-  }
-
-  it should "return return executor summary" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check {
-      val responseBody = responseAs[String]
-      val executorSummary = read[ExecutorSummary](responseBody)
-      assert(executorSummary.id == 0)
-    }
-  }
-
-  override def afterAll {
-    TestKit.shutdownActorSystem(system)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
deleted file mode 100644
index a24db18..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import java.io.File
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Success, Try}
-
-import akka.actor.ActorRef
-import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model._
-import akka.http.scaladsl.model.headers.`Cache-Control`
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import akka.stream.scaladsl.{FileIO, Source}
-import akka.testkit.TestActor.{AutoPilot, KeepRunning}
-import akka.testkit.TestProbe
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
-import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication}
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList}
-import io.gearpump.cluster.MasterToClient._
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-import io.gearpump.streaming.ProcessorDescription
-import io.gearpump.util.Graph
-
-class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
-  with Matchers with BeforeAndAfterAll {
-  import upickle.default.{read, write}
-
-  override def testConfig: Config = TestUtil.UI_CONFIG
-
-  private def actorRefFactory = system
-  val workerId = 0
-  val mockWorker = TestProbe()
-
-  lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
-  private def master = mockMaster.ref
-
-  def jarStore: JarStoreService = jarStoreService
-
-  private def masterRoute = new MasterService(master, jarStore, system).route
-
-  mockWorker.setAutoPilot {
-    new AutoPilot {
-      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
-        case GetWorkerData(workerId) =>
-          sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId))
-          KeepRunning
-        case QueryHistoryMetrics(path, _, _, _) =>
-          sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
-          KeepRunning
-      }
-    }
-  }
-
-  val mockMaster = TestProbe()
-  mockMaster.setAutoPilot {
-    new AutoPilot {
-      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
-        case GetMasterData =>
-          sender ! MasterData(null)
-          KeepRunning
-        case AppMastersDataRequest =>
-          sender ! AppMastersData(List.empty[AppMasterData])
-          KeepRunning
-        case GetAllWorkers =>
-          sender ! WorkerList(List(WorkerId(0, 0L)))
-          KeepRunning
-        case ResolveWorkerId(WorkerId(0, 0L)) =>
-          sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
-          KeepRunning
-        case QueryHistoryMetrics(path, _, _, _) =>
-          sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
-          KeepRunning
-        case QueryMasterConfig =>
-          sender ! MasterConfig(null)
-          KeepRunning
-        case submit: SubmitApplication =>
-          sender ! SubmitApplicationResult(Success(0))
-          KeepRunning
-      }
-    }
-  }
-
-  it should "return master info when asked" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check {
-      // Checks the type
-      val content = responseAs[String]
-      read[MasterData](content)
-
-      // Checks the header, should contains no-cache header.
-      // Cache-Control:no-cache, max-age=0
-      val noCache = header[`Cache-Control`].get.value()
-      assert(noCache == "no-cache, max-age=0")
-    }
-
-    mockMaster.expectMsg(GetMasterData)
-  }
-
-  it should "return a json structure of appMastersData for GET request" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check {
-      // Checks the type
-      read[AppMastersData](responseAs[String])
-    }
-    mockMaster.expectMsg(AppMastersDataRequest)
-  }
-
-  it should "return a json structure of worker data for GET request" in {
-    implicit val customTimeout = RouteTestTimeout(25.seconds)
-    Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check {
-      // Checks the type
-      val workerListJson = responseAs[String]
-      val workers = read[List[WorkerSummary]](workerListJson)
-      assert(workers.size > 0)
-      workers.foreach { worker =>
-        worker.state shouldBe "active"
-      }
-    }
-    mockMaster.expectMsg(GetAllWorkers)
-    mockMaster.expectMsgType[ResolveWorkerId]
-    mockWorker.expectMsgType[GetWorkerData]
-  }
-
-  it should "return config for master" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check {
-      val responseBody = responseAs[String]
-      val config = Try(ConfigFactory.parseString(responseBody))
-      assert(config.isSuccess)
-    }
-    mockMaster.expectMsg(QueryMasterConfig)
-  }
-
-  "submit invalid application" should "return an error" in {
-    implicit val routeTestTimeout = RouteTestTimeout(30.second)
-    val tempfile = new File("foo")
-    val request = entity(tempfile)
-
-    Post(s"/api/$REST_VERSION/master/submitapp", request) ~> masterRoute ~> check {
-      assert(response.status.intValue == 500)
-    }
-  }
-
-  private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
-    val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
-      FileIO.fromFile(file, chunkSize = 100000))
-
-    val body = Source.single(
-      Multipart.FormData.BodyPart(
-        "file",
-        entity,
-        Map("filename" -> file.getName)))
-    val form = Multipart.FormData(body)
-
-    Marshal(form).to[RequestEntity]
-  }
-
-  "MetricsQueryService" should "return history metrics" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check {
-      val responseBody = responseAs[String]
-      val config = Try(ConfigFactory.parseString(responseBody))
-      assert(config.isSuccess)
-    }
-  }
-
-  "submitDag" should "submit a SubmitApplicationRequest and get an appId > 0" in {
-    import io.gearpump.util.Graph._
-    val processors = Map(
-      0 -> ProcessorDescription(0, "A", parallelism = 1),
-      1 -> ProcessorDescription(1, "B", parallelism = 1)
-    )
-    val dag = Graph(0 ~ "partitioner" ~> 1)
-    val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null))
-    Post(s"/api/$REST_VERSION/master/submitdag",
-      HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check {
-      val responseBody = responseAs[String]
-      val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody)
-      assert(submitApplicationResultValue.appId >= 0, "invalid appid")
-    }
-  }
-
-  "MasterService" should "return Gearpump built-in partitioner list" in {
-    (Get(s"/api/$REST_VERSION/master/partitioners") ~> masterRoute) ~> check {
-      val responseBody = responseAs[String]
-      val partitioners = read[BuiltinPartitioners](responseBody)
-      assert(partitioners.partitioners.length > 0, "invalid response")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
deleted file mode 100644
index 3cf99e9..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model.FormData
-import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`, _}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server.{AuthorizationFailedRejection, _}
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-class SecurityServiceSpec
-  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
-
-  override def testConfig: Config = TestUtil.UI_CONFIG
-
-  implicit def actorSystem: ActorSystem = system
-
-  it should "return 401 if not authenticated" in {
-    val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
-
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-
-    (Get(s"/resource") ~> security.route) ~> check {
-      assert(rejection.isInstanceOf[AuthenticationFailedRejection])
-    }
-  }
-
-  "guest" should "get protected resource after authentication" in {
-    val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
-
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-
-    var cookie: HttpCookiePair = null
-    (Post(s"/login", FormData("username" -> "guest", "password" -> "guest"))
-      ~> security.route) ~> check {
-      assert("{\"user\":\"guest\"}" == responseAs[String])
-      assert(status.intValue() == 200)
-      assert(header[`Set-Cookie`].isDefined)
-      val httpCookie = header[`Set-Cookie`].get.cookie
-      assert(httpCookie.name == "gearpump_token")
-      cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value)
-    }
-
-    // After authentication, everything is fine.
-    Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
-      responseAs[String] shouldEqual "OK"
-    }
-
-    // However, guest cannot access high-permission operations, like POST.
-    Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
-      assert(rejection == AuthorizationFailedRejection)
-    }
-
-    // Logout, should clear the session
-    Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
-      assert("{\"user\":\"guest\"}" == responseAs[String])
-      assert(status.intValue() == 200)
-      assert(header[`Set-Cookie`].isDefined)
-      val httpCookie = header[`Set-Cookie`].get.cookie
-      assert(httpCookie.name == "gearpump_token")
-      assert(httpCookie.value == "deleted")
-    }
-
-    // Access again, rejected this time.
-    Get("/resource") ~> security.route ~> check {
-      assert(rejection.isInstanceOf[AuthenticationFailedRejection])
-    }
-
-    Post("/resource") ~> security.route ~> check {
-      assert(rejection.isInstanceOf[AuthenticationFailedRejection])
-    }
-  }
-
-  "admin" should "get protected resource after authentication" in {
-    val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
-
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-
-    var cookie: HttpCookiePair = null
-    (Post(s"/login", FormData("username" -> "admin", "password" -> "admin"))
-      ~> security.route) ~> check {
-      assert("{\"user\":\"admin\"}" == responseAs[String])
-      assert(status.intValue() == 200)
-      assert(header[`Set-Cookie`].isDefined)
-      val httpCookie = header[`Set-Cookie`].get.cookie
-      assert(httpCookie.name == "gearpump_token")
-      cookie = HttpCookiePair(httpCookie.name, httpCookie.value)
-    }
-
-    // After authentication, everything is fine.
-    Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
-      responseAs[String] shouldEqual "OK"
-    }
-
-    // Not like guest, admimn can also access POST
-    Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
-      responseAs[String] shouldEqual "OK"
-    }
-
-    // Logout, should clear the session
-    Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
-      assert("{\"user\":\"admin\"}" == responseAs[String])
-      assert(status.intValue() == 200)
-      assert(header[`Set-Cookie`].isDefined)
-      val httpCookie = header[`Set-Cookie`].get.cookie
-      assert(httpCookie.name == "gearpump_token")
-      assert(httpCookie.value == "deleted")
-    }
-
-    // Access again, rejected this time.
-    Get("/resource") ~> security.route ~> check {
-      assert(rejection.isInstanceOf[AuthenticationFailedRejection])
-    }
-
-    Post("/resource") ~> security.route ~> check {
-      assert(rejection.isInstanceOf[AuthenticationFailedRejection])
-    }
-  }
-}
-
-object SecurityServiceSpec {
-
-  val resource = new RouteService {
-    override def route: Route = {
-      get {
-        path("resource") {
-          complete("OK")
-        }
-      } ~
-      post {
-        path("resource") {
-          complete("OK")
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
deleted file mode 100644
index f61e2f5..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.duration._
-import scala.util.Try
-
-import akka.http.scaladsl.model.headers.`Cache-Control`
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.util.Constants
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-class StaticServiceSpec
-  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
-
-  override def testConfig: Config = TestUtil.UI_CONFIG
-  private val supervisorPath = system.settings.config.getString(
-    Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
-
-  protected def route = new StaticService(system, supervisorPath).route
-
-  it should "return version" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/version") ~> route) ~> check {
-      val responseBody = responseAs[String]
-      val config = Try(ConfigFactory.parseString(responseBody))
-      assert(responseBody == "Unknown-Version")
-
-      // By default, it will be cached.
-      assert(header[`Cache-Control`].isEmpty)
-    }
-  }
-
-  it should "get correct supervisor path" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/supervisor-actor-path") ~> route) ~> check {
-      val responseBody = responseAs[String]
-      val defaultSupervisorPath = ""
-      assert(responseBody == defaultSupervisorPath)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
deleted file mode 100644
index 2676a16..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.duration._
-import scala.util.{Success, Try}
-
-import akka.actor.ActorRef
-import akka.http.scaladsl.model.headers.`Cache-Control`
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import akka.testkit.TestActor.{AutoPilot, KeepRunning}
-import akka.testkit.{TestKit, TestProbe}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId}
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import io.gearpump.jarstore.JarStoreService
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-class WorkerServiceSpec
-  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
-
-  override def testConfig: Config = TestUtil.DEFAULT_CONFIG
-
-  protected def actorRefFactory = system
-
-  val mockWorker = TestProbe()
-
-  protected def master = mockMaster.ref
-
-  lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
-  protected def workerRoute = new WorkerService(master, system).route
-
-  mockWorker.setAutoPilot {
-    new AutoPilot {
-      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
-        case GetWorkerData(workerId) =>
-          sender ! WorkerData(WorkerSummary.empty)
-          KeepRunning
-        case QueryWorkerConfig(workerId) =>
-          sender ! WorkerConfig(null)
-          KeepRunning
-        case QueryHistoryMetrics(path, _, _, _) =>
-          sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
-          KeepRunning
-      }
-    }
-  }
-
-  val mockMaster = TestProbe()
-  mockMaster.setAutoPilot {
-    new AutoPilot {
-      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
-        case ResolveWorkerId(workerId) =>
-          sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
-          KeepRunning
-      }
-    }
-  }
-
-  "ConfigQueryService" should "return config for worker" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config")
-      ~> workerRoute) ~> check {
-      val responseBody = responseAs[String]
-      val config = Try(ConfigFactory.parseString(responseBody))
-      assert(config.isSuccess)
-    }
-  }
-
-  it should "return WorkerData" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}")
-      ~> workerRoute) ~> check {
-      val responseBody = responseAs[String]
-      val config = Try(ConfigFactory.parseString(responseBody))
-      assert(config.isSuccess)
-
-      // Check the header, should contains no-cache header.
-      // Cache-Control:no-cache, max-age=0
-      val noCache = header[`Cache-Control`].get.value()
-      assert(noCache == "no-cache, max-age=0")
-    }
-  }
-
-  "MetricsQueryService" should "return history metrics" in {
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker")
-      ~> workerRoute) ~> check {
-      val responseBody = responseAs[String]
-      val config = Try(ConfigFactory.parseString(responseBody))
-      assert(config.isSuccess)
-    }
-  }
-
-  override def afterAll {
-    TestKit.shutdownActorSystem(system)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
deleted file mode 100644
index 136026a..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model.HttpEntity.Strict
-import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.{Path, Query}
-import akka.http.scaladsl.model._
-import akka.http.scaladsl.testkit.ScalatestRouteTest
-import com.typesafe.config.ConfigFactory
-import org.scalatest.FlatSpec
-
-import io.gearpump.security.Authenticator
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-import io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator
-
-class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
-
-  implicit val actorSystem: ActorSystem = system
-  private val server = new MockOAuth2Server(system, null)
-  server.start()
-  private val serverHost = s"http://127.0.0.1:${server.port}"
-
-  val configMap = Map(
-    "class" -> "io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator",
-    "callback" -> s"$serverHost/login/oauth2/cloudfoundryuaa/callback",
-    "clientid" -> "gearpump_test2",
-    "clientsecret" -> "gearpump_test2",
-    "default-userrole" -> "user",
-    "icon" -> "/icons/uaa.png",
-    "uaahost" -> serverHost,
-    "additional-authenticator-enabled" -> "false")
-
-  val configString = ConfigFactory.parseMap(configMap.asJava)
-
-  lazy val uaa = {
-    val uaa = new CloudFoundryUAAOAuth2Authenticator
-    uaa.init(configString, system.dispatcher)
-    uaa
-  }
-
-  it should "generate the correct authorization request" in {
-    val parameters = Uri(uaa.getAuthorizationUrl()).query().toMap
-    assert(parameters("response_type") == "code")
-    assert(parameters("client_id") == configMap("clientid"))
-    assert(parameters("redirect_uri") == configMap("callback"))
-    assert(parameters("scope") == "openid,cloud_controller.read")
-  }
-
-  it should "authenticate the authorization code and return the correct profile" in {
-    val code = Map("code" -> "QGGVeA")
-    val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c"
-    val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
-    val mail = "test@gearpump.io"
-
-    def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
-      assert(request.getHeader("Authorization").get.value() ==
-        "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=")
-      assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
-
-      val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
-      val form = Uri./.withQuery(Query(body)).query().toMap
-
-      assert(form("grant_type") == "authorization_code")
-      assert(form("code") == "QGGVeA")
-      assert(form("response_type") == "token")
-      assert(form("redirect_uri") == configMap("callback"))
-
-      val response =
-        s"""
-        |{
-        |  "access_token": "$accessToken",
-        |  "token_type": "bearer",
-        |  "refresh_token": "$refreshToken",
-        |  "expires_in": 43199,
-        |  "scope": "openid",
-        |  "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9"
-        |}
-        """.stripMargin
-      HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
-    }
-
-    def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
-      assert(request.getUri().query().get("access_token").get == accessToken)
-      val response =
-        s"""
-        |{
-        |    "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c",
-        |    "user_name": "user",
-        |    "email": "$mail"
-        |}
-        """.stripMargin
-      HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
-    }
-
-    server.requestHandler = (request: HttpRequest) => {
-      if (request.uri.path.startsWith(Path("/oauth/token"))) {
-        accessTokenEndpoint(request)
-      } else if (request.uri.path.startsWith(Path("/userinfo"))) {
-        protectedResourceEndpoint(request)
-      } else {
-        fail("Unexpected access to " + request.uri.toString())
-      }
-    }
-
-    val userFuture = uaa.authenticate(code)
-    val user = Await.result(userFuture, 30.seconds)
-    assert(user.user == mail)
-    assert(user.permissionLevel == Authenticator.User.permissionLevel)
-  }
-
-  override def cleanUp(): Unit = {
-    server.stop()
-    uaa.close()
-    super.cleanUp()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
deleted file mode 100644
index 70d8bb0..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model.HttpEntity.Strict
-import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.{Path, Query}
-import akka.http.scaladsl.model._
-import akka.http.scaladsl.testkit.ScalatestRouteTest
-import com.typesafe.config.ConfigFactory
-import org.scalatest.FlatSpec
-
-import io.gearpump.security.Authenticator
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-import io.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator
-import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator
-
-class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
-
-  implicit val actorSystem: ActorSystem = system
-  private val server = new MockOAuth2Server(system, null)
-  server.start()
-  private val serverHost = s"http://127.0.0.1:${server.port}"
-
-  val configMap = Map(
-    "class" -> "io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator",
-    "callback" -> s"$serverHost/login/oauth2/google/callback",
-    "clientid" -> "170234147043-a1tag68jtq6ab4bi11jvsj7vbaqcmhkt.apps.googleusercontent.com",
-    "clientsecret" -> "ioeWLLDipz2S7aTDXym2-obe",
-    "default-userrole" -> "guest",
-    "icon" -> "/icons/google.png")
-
-  val configString = ConfigFactory.parseMap(configMap.asJava)
-
-  private lazy val google = {
-    val google = new MockGoogleAuthenticator(serverHost)
-    google.init(configString, system.dispatcher)
-    google
-  }
-
-  it should "generate the correct authorization request" in {
-    val parameters = Uri(google.getAuthorizationUrl()).query().toMap
-    assert(parameters("response_type") == "code")
-    assert(parameters("client_id") == configMap("clientid"))
-    assert(parameters("redirect_uri") == configMap("callback"))
-    assert(parameters("scope") == GoogleOAuth2Authenticator.Scope)
-  }
-
-  it should "authenticate the authorization code and return the correct profile" in {
-    val code = Map("code" -> "4/PME0pfxjiBA42SukR-OTGl7fpFzTWzvZPf1TbkpXL4M#")
-    val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c"
-    val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
-    val mail = "test@gearpump.io"
-
-    def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
-
-      assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
-
-      val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
-      val form = Uri./.withQuery(Query(body)).query().toMap
-
-      assert(form("client_id") == configMap("clientid"))
-      assert(form("client_secret") == configMap("clientsecret"))
-      assert(form("grant_type") == "authorization_code")
-      assert(form("code") == code("code"))
-      assert(form("redirect_uri") == configMap("callback"))
-      assert(form("scope") == GoogleOAuth2Authenticator.Scope)
-
-      // scalastyle:off line.size.limit
-      val response =
-        s"""
-        |{
-        | "access_token": "$accessToken",
-        | "token_type": "Bearer",
-        | "expires_in": 3591,
-        | "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3"
-        |}
-        """.stripMargin
-      // scalastyle:on line.size.limit
-
-      HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
-    }
-
-    def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
-      assert(request.getUri().query().get("access_token").get == accessToken)
-      val response =
-        s"""
-        |{
-        |   "kind": "plus#person",
-        |   "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8",
-        |   "nickname": "gearpump",
-        |   "gender": "female",
-        |   "emails": [
-        |     {
-        |       "value": "$mail",
-        |       "type": "account"
-        |     }
-        |   ]
-        | }
-        """.stripMargin
-      HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
-    }
-
-    server.requestHandler = (request: HttpRequest) => {
-      if (request.uri.path.startsWith(Path("/oauth2/v4/token"))) {
-        accessTokenEndpoint(request)
-      } else if (request.uri.path.startsWith(Path("/plus/v1/people/me"))) {
-        protectedResourceEndpoint(request)
-      } else {
-        fail("Unexpected access to " + request.uri.toString())
-      }
-    }
-
-    val userFuture = google.authenticate(code)
-    val user = Await.result(userFuture, 30.seconds)
-    assert(user.user == mail)
-    assert(user.permissionLevel == Authenticator.Guest.permissionLevel)
-  }
-
-  override def cleanUp(): Unit = {
-    server.stop()
-    google.close()
-    super.cleanUp()
-  }
-}
-
-object GoogleOAuth2AuthenticatorSpec {
-  class MockGoogleAuthenticator(host: String) extends GoogleOAuth2Authenticator {
-    protected override def authorizeUrl: String = {
-      super.authorizeUrl.replace("https://accounts.google.com", host)
-    }
-
-    protected override def accessTokenEndpoint: String = {
-      super.accessTokenEndpoint.replace("https://www.googleapis.com", host)
-    }
-
-    protected override def protectedResourceUrl: String = {
-      super.protectedResourceUrl.replace("https://www.googleapis.com", host)
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
deleted file mode 100644
index c03532d..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2
-
-import scala.concurrent.{Await, Future}
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.ServerBinding
-import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
-import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.Sink
-
-import io.gearpump.util.Util
-// NOTE: This cannot be removed!!
-import io.gearpump.services.util.UpickleUtil._
-
-/**
- * Serves as a fake OAuth2 server.
- */
-class MockOAuth2Server(
-    actorSystem: ActorSystem,
-    var requestHandler: HttpRequest => HttpResponse) {
-
-  implicit val system: ActorSystem = actorSystem
-  implicit val materializer = ActorMaterializer()
-  implicit val ec = system.dispatcher
-
-  private var _port: Int = 0
-  private var bindingFuture: Future[ServerBinding] = null
-
-  def port: Int = _port
-
-  def start(): Unit = {
-    _port = Util.findFreePort().get
-
-    val serverSource = Http().bind(interface = "127.0.0.1", port = _port)
-    bindingFuture = {
-      serverSource.to(Sink.foreach { connection =>
-        connection handleWithSyncHandler requestHandler
-      }).run()
-    }
-  }
-
-  def stop(): Unit = {
-    import scala.concurrent.duration._
-    Await.result(bindingFuture.map(_.unbind()), 120.seconds)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
deleted file mode 100644
index b074813..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.util
-
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-import upickle.default.{read, write}
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.metrics.Metrics.{Counter, MetricType}
-import io.gearpump.services.util.UpickleUtil._
-import io.gearpump.streaming.ProcessorId
-import io.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary}
-import io.gearpump.util.Graph
-
-class UpickleSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
-  "UserConfig" should "serialize and deserialize with upickle correctly" in {
-    val conf = UserConfig.empty.withString("key", "value")
-    val serialized = write(conf)
-    val deserialized = read[UserConfig](serialized)
-    assert(deserialized.getString("key") == Some("value"))
-  }
-
-  "Graph" should "be able to serialize/deserialize correctly" in {
-    val graph = new Graph[Int, String](List(0, 1), List((0, "edge", 1)))
-    val serialized = write(graph)
-
-    val deserialized = read[Graph[Int, String]](serialized)
-
-    graph.vertices.toSet shouldBe deserialized.vertices.toSet
-    graph.edges.toSet shouldBe deserialized.edges.toSet
-  }
-
-  "MetricType" should "be able to serialize/deserialize correctly" in {
-    val metric: MetricType = Counter("counter", 100L)
-    val serialized = write(metric)
-    val deserialized = read[MetricType](serialized)
-    metric shouldBe deserialized
-  }
-
-  "StreamingAppMasterDataDetail" should "serialize and deserialize with upickle correctly" in {
-    val app = new StreamAppMasterSummary(appId = 0,
-      processors = Map.empty[ProcessorId, ProcessorSummary],
-      processorLevels = Map.empty[ProcessorId, Int]
-    )
-
-    val serialized = write(app)
-    val deserialized = read[StreamAppMasterSummary](serialized)
-    assert(deserialized == app)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala
new file mode 100644
index 0000000..b7f294c
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+class AdminServiceSpec
+  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+
+  override def testConfig: Config = TestUtil.DEFAULT_CONFIG
+
+  implicit def actorSystem: ActorSystem = system
+
+  it should "shutdown the ActorSystem when receiving terminate" in {
+    val route = new AdminService(actorSystem).route
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Post(s"/terminate") ~> route) ~> check {
+      assert(status.intValue() == 404)
+    }
+
+    Await.result(actorSystem.whenTerminated, 20.seconds)
+
+    // terminate should terminate current actor system
+    assert(actorSystem.whenTerminated.isCompleted)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
new file mode 100644
index 0000000..2ece554
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.duration._
+import scala.util.{Success, Try}
+
+import akka.actor.ActorRef
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import akka.testkit.TestActor.{AutoPilot, KeepRunning}
+import akka.testkit.{TestKit, TestProbe}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import org.slf4j.Logger
+import upickle.default.read
+
+import org.apache.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary
+import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
+import org.apache.gearpump.util.LogUtil
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
+  with Matchers with BeforeAndAfterAll {
+
+  override def testConfig: Config = TestUtil.UI_CONFIG
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private def actorRefFactory = system
+
+  val mockAppMaster = TestProbe()
+  val failure = LastFailure(System.currentTimeMillis(), "Some error")
+
+  lazy val jarStoreService = JarStoreService.get(system.settings.config)
+
+  def jarStore: JarStoreService = jarStoreService
+
+  private def master = mockMaster.ref
+
+  private def appMasterRoute = new AppMasterService(master, jarStore, system).route
+
+  mockAppMaster.setAutoPilot {
+    new AutoPilot {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+        case AppMasterDataDetailRequest(appId) =>
+          sender ! GeneralAppMasterSummary(appId)
+          KeepRunning
+        case QueryHistoryMetrics(path, _, _, _) =>
+          sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
+          KeepRunning
+        case GetLastFailure(appId) =>
+          sender ! failure
+          KeepRunning
+        case GetExecutorSummary(0) =>
+          sender ! ExecutorSummary.empty
+          KeepRunning
+        case QueryExecutorConfig(0) =>
+          sender ! ExecutorConfig(system.settings.config)
+          KeepRunning
+      }
+    }
+  }
+
+  val mockMaster = TestProbe()
+  mockMaster.setAutoPilot {
+    new AutoPilot {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+        case ResolveAppId(0) =>
+          sender ! ResolveAppIdResult(Success(mockAppMaster.ref))
+          KeepRunning
+        case AppMasterDataRequest(appId, _) =>
+          sender ! AppMasterData("active")
+          KeepRunning
+        case QueryAppMasterConfig(appId) =>
+          sender ! AppMasterConfig(null)
+          KeepRunning
+      }
+    }
+  }
+
+  "AppMasterService" should "return a JSON structure for GET request when detail = false" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check {
+      val responseBody = responseAs[String]
+      read[AppMasterData](responseBody)
+
+      // Checks the header, should contains no-cache header.
+      // Cache-Control:no-cache, max-age=0
+      val noCache = header[`Cache-Control`].get.value()
+      assert(noCache == "no-cache, max-age=0")
+    }
+
+    Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check {
+      val responseBody = responseAs[String]
+    }
+  }
+
+  "MetricsQueryService" should "return history metrics" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/appmaster/0/metrics/processor") ~> appMasterRoute) ~> check {
+      val responseBody = responseAs[String]
+      val config = Try(ConfigFactory.parseString(responseBody))
+      assert(config.isSuccess)
+    }
+  }
+
+  "AppMaster" should "return lastest error" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/appmaster/0/errors") ~> appMasterRoute) ~> check {
+      val responseBody = responseAs[String]
+      assert(read[LastFailure](responseBody) == failure)
+    }
+  }
+
+  "ConfigQueryService" should "return config for application" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check {
+      val responseBody = responseAs[String]
+      val config = Try(ConfigFactory.parseString(responseBody))
+      assert(config.isSuccess)
+    }
+  }
+
+  it should "return config for executor " in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check {
+      val responseBody = responseAs[String]
+      val config = Try(ConfigFactory.parseString(responseBody))
+      assert(config.isSuccess)
+    }
+  }
+
+  it should "return return executor summary" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check {
+      val responseBody = responseAs[String]
+      val executorSummary = read[ExecutorSummary](responseBody)
+      assert(executorSummary.id == 0)
+    }
+  }
+
+  override def afterAll {
+    TestKit.shutdownActorSystem(system)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
new file mode 100644
index 0000000..e365e9f
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import java.io.File
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Success, Try}
+
+import akka.actor.ActorRef
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import akka.stream.scaladsl.{FileIO, Source}
+import akka.testkit.TestActor.{AutoPilot, KeepRunning}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.ProcessorDescription
+import org.apache.gearpump.util.Graph
+
+class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
+  with Matchers with BeforeAndAfterAll {
+  import upickle.default.{read, write}
+
+  override def testConfig: Config = TestUtil.UI_CONFIG
+
+  private def actorRefFactory = system
+  val workerId = 0
+  val mockWorker = TestProbe()
+
+  lazy val jarStoreService = JarStoreService.get(system.settings.config)
+
+  private def master = mockMaster.ref
+
+  def jarStore: JarStoreService = jarStoreService
+
+  private def masterRoute = new MasterService(master, jarStore, system).route
+
+  mockWorker.setAutoPilot {
+    new AutoPilot {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+        case GetWorkerData(workerId) =>
+          sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId))
+          KeepRunning
+        case QueryHistoryMetrics(path, _, _, _) =>
+          sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
+          KeepRunning
+      }
+    }
+  }
+
+  val mockMaster = TestProbe()
+  mockMaster.setAutoPilot {
+    new AutoPilot {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+        case GetMasterData =>
+          sender ! MasterData(null)
+          KeepRunning
+        case AppMastersDataRequest =>
+          sender ! AppMastersData(List.empty[AppMasterData])
+          KeepRunning
+        case GetAllWorkers =>
+          sender ! WorkerList(List(WorkerId(0, 0L)))
+          KeepRunning
+        case ResolveWorkerId(WorkerId(0, 0L)) =>
+          sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
+          KeepRunning
+        case QueryHistoryMetrics(path, _, _, _) =>
+          sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
+          KeepRunning
+        case QueryMasterConfig =>
+          sender ! MasterConfig(null)
+          KeepRunning
+        case submit: SubmitApplication =>
+          sender ! SubmitApplicationResult(Success(0))
+          KeepRunning
+      }
+    }
+  }
+
+  it should "return master info when asked" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check {
+      // Checks the type
+      val content = responseAs[String]
+      read[MasterData](content)
+
+      // Checks the header, should contains no-cache header.
+      // Cache-Control:no-cache, max-age=0
+      val noCache = header[`Cache-Control`].get.value()
+      assert(noCache == "no-cache, max-age=0")
+    }
+
+    mockMaster.expectMsg(GetMasterData)
+  }
+
+  it should "return a json structure of appMastersData for GET request" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check {
+      // Checks the type
+      read[AppMastersData](responseAs[String])
+    }
+    mockMaster.expectMsg(AppMastersDataRequest)
+  }
+
+  it should "return a json structure of worker data for GET request" in {
+    implicit val customTimeout = RouteTestTimeout(25.seconds)
+    Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check {
+      // Checks the type
+      val workerListJson = responseAs[String]
+      val workers = read[List[WorkerSummary]](workerListJson)
+      assert(workers.size > 0)
+      workers.foreach { worker =>
+        worker.state shouldBe "active"
+      }
+    }
+    mockMaster.expectMsg(GetAllWorkers)
+    mockMaster.expectMsgType[ResolveWorkerId]
+    mockWorker.expectMsgType[GetWorkerData]
+  }
+
+  it should "return config for master" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check {
+      val responseBody = responseAs[String]
+      val config = Try(ConfigFactory.parseString(responseBody))
+      assert(config.isSuccess)
+    }
+    mockMaster.expectMsg(QueryMasterConfig)
+  }
+
+  "submit invalid application" should "return an error" in {
+    implicit val routeTestTimeout = RouteTestTimeout(30.second)
+    val tempfile = new File("foo")
+    val request = entity(tempfile)
+
+    Post(s"/api/$REST_VERSION/master/submitapp", request) ~> masterRoute ~> check {
+      assert(response.status.intValue == 500)
+    }
+  }
+
+  private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
+    val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
+      FileIO.fromFile(file, chunkSize = 100000))
+
+    val body = Source.single(
+      Multipart.FormData.BodyPart(
+        "file",
+        entity,
+        Map("filename" -> file.getName)))
+    val form = Multipart.FormData(body)
+
+    Marshal(form).to[RequestEntity]
+  }
+
+  "MetricsQueryService" should "return history metrics" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check {
+      val responseBody = responseAs[String]
+      val config = Try(ConfigFactory.parseString(responseBody))
+      assert(config.isSuccess)
+    }
+  }
+
+  "submitDag" should "submit a SubmitApplicationRequest and get an appId > 0" in {
+    import org.apache.gearpump.util.Graph._
+    val processors = Map(
+      0 -> ProcessorDescription(0, "A", parallelism = 1),
+      1 -> ProcessorDescription(1, "B", parallelism = 1)
+    )
+    val dag = Graph(0 ~ "partitioner" ~> 1)
+    val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null))
+    Post(s"/api/$REST_VERSION/master/submitdag",
+      HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check {
+      val responseBody = responseAs[String]
+      val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody)
+      assert(submitApplicationResultValue.appId >= 0, "invalid appid")
+    }
+  }
+
+  "MasterService" should "return Gearpump built-in partitioner list" in {
+    (Get(s"/api/$REST_VERSION/master/partitioners") ~> masterRoute) ~> check {
+      val responseBody = responseAs[String]
+      val partitioners = read[BuiltinPartitioners](responseBody)
+      assert(partitioners.partitioners.length > 0, "invalid response")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala
new file mode 100644
index 0000000..c8bfb29
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.FormData
+import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`, _}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.{AuthorizationFailedRejection, _}
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+class SecurityServiceSpec
+  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+
+  override def testConfig: Config = TestUtil.UI_CONFIG
+
+  implicit def actorSystem: ActorSystem = system
+
+  it should "return 401 if not authenticated" in {
+    val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
+
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+
+    (Get(s"/resource") ~> security.route) ~> check {
+      assert(rejection.isInstanceOf[AuthenticationFailedRejection])
+    }
+  }
+
+  "guest" should "get protected resource after authentication" in {
+    val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
+
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+
+    var cookie: HttpCookiePair = null
+    (Post(s"/login", FormData("username" -> "guest", "password" -> "guest"))
+      ~> security.route) ~> check {
+      assert("{\"user\":\"guest\"}" == responseAs[String])
+      assert(status.intValue() == 200)
+      assert(header[`Set-Cookie`].isDefined)
+      val httpCookie = header[`Set-Cookie`].get.cookie
+      assert(httpCookie.name == "gearpump_token")
+      cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value)
+    }
+
+    // After authentication, everything is fine.
+    Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
+      responseAs[String] shouldEqual "OK"
+    }
+
+    // However, guest cannot access high-permission operations, like POST.
+    Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
+      assert(rejection == AuthorizationFailedRejection)
+    }
+
+    // Logout, should clear the session
+    Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
+      assert("{\"user\":\"guest\"}" == responseAs[String])
+      assert(status.intValue() == 200)
+      assert(header[`Set-Cookie`].isDefined)
+      val httpCookie = header[`Set-Cookie`].get.cookie
+      assert(httpCookie.name == "gearpump_token")
+      assert(httpCookie.value == "deleted")
+    }
+
+    // Access again, rejected this time.
+    Get("/resource") ~> security.route ~> check {
+      assert(rejection.isInstanceOf[AuthenticationFailedRejection])
+    }
+
+    Post("/resource") ~> security.route ~> check {
+      assert(rejection.isInstanceOf[AuthenticationFailedRejection])
+    }
+  }
+
+  "admin" should "get protected resource after authentication" in {
+    val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
+
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+
+    var cookie: HttpCookiePair = null
+    (Post(s"/login", FormData("username" -> "admin", "password" -> "admin"))
+      ~> security.route) ~> check {
+      assert("{\"user\":\"admin\"}" == responseAs[String])
+      assert(status.intValue() == 200)
+      assert(header[`Set-Cookie`].isDefined)
+      val httpCookie = header[`Set-Cookie`].get.cookie
+      assert(httpCookie.name == "gearpump_token")
+      cookie = HttpCookiePair(httpCookie.name, httpCookie.value)
+    }
+
+    // After authentication, everything is fine.
+    Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
+      responseAs[String] shouldEqual "OK"
+    }
+
+    // Not like guest, admimn can also access POST
+    Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
+      responseAs[String] shouldEqual "OK"
+    }
+
+    // Logout, should clear the session
+    Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
+      assert("{\"user\":\"admin\"}" == responseAs[String])
+      assert(status.intValue() == 200)
+      assert(header[`Set-Cookie`].isDefined)
+      val httpCookie = header[`Set-Cookie`].get.cookie
+      assert(httpCookie.name == "gearpump_token")
+      assert(httpCookie.value == "deleted")
+    }
+
+    // Access again, rejected this time.
+    Get("/resource") ~> security.route ~> check {
+      assert(rejection.isInstanceOf[AuthenticationFailedRejection])
+    }
+
+    Post("/resource") ~> security.route ~> check {
+      assert(rejection.isInstanceOf[AuthenticationFailedRejection])
+    }
+  }
+}
+
+object SecurityServiceSpec {
+
+  val resource = new RouteService {
+    override def route: Route = {
+      get {
+        path("resource") {
+          complete("OK")
+        }
+      } ~
+      post {
+        path("resource") {
+          complete("OK")
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala
new file mode 100644
index 0000000..33f0866
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.duration._
+import scala.util.Try
+
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.util.Constants
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+class StaticServiceSpec
+  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+
+  override def testConfig: Config = TestUtil.UI_CONFIG
+  private val supervisorPath = system.settings.config.getString(
+    Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
+
+  protected def route = new StaticService(system, supervisorPath).route
+
+  it should "return version" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/version") ~> route) ~> check {
+      val responseBody = responseAs[String]
+      val config = Try(ConfigFactory.parseString(responseBody))
+      assert(responseBody == "Unknown-Version")
+
+      // By default, it will be cached.
+      assert(header[`Cache-Control`].isEmpty)
+    }
+  }
+
+  it should "get correct supervisor path" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/supervisor-actor-path") ~> route) ~> check {
+      val responseBody = responseAs[String]
+      val defaultSupervisorPath = ""
+      assert(responseBody == defaultSupervisorPath)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
new file mode 100644
index 0000000..4658c98
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.duration._
+import scala.util.{Success, Try}
+
+import akka.actor.ActorRef
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import akka.testkit.TestActor.{AutoPilot, KeepRunning}
+import akka.testkit.{TestKit, TestProbe}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.jarstore.JarStoreService
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+class WorkerServiceSpec
+  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+
+  override def testConfig: Config = TestUtil.DEFAULT_CONFIG
+
+  protected def actorRefFactory = system
+
+  val mockWorker = TestProbe()
+
+  protected def master = mockMaster.ref
+
+  lazy val jarStoreService = JarStoreService.get(system.settings.config)
+
+  protected def workerRoute = new WorkerService(master, system).route
+
+  mockWorker.setAutoPilot {
+    new AutoPilot {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+        case GetWorkerData(workerId) =>
+          sender ! WorkerData(WorkerSummary.empty)
+          KeepRunning
+        case QueryWorkerConfig(workerId) =>
+          sender ! WorkerConfig(null)
+          KeepRunning
+        case QueryHistoryMetrics(path, _, _, _) =>
+          sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
+          KeepRunning
+      }
+    }
+  }
+
+  val mockMaster = TestProbe()
+  mockMaster.setAutoPilot {
+    new AutoPilot {
+      def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+        case ResolveWorkerId(workerId) =>
+          sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
+          KeepRunning
+      }
+    }
+  }
+
+  "ConfigQueryService" should "return config for worker" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config")
+      ~> workerRoute) ~> check {
+      val responseBody = responseAs[String]
+      val config = Try(ConfigFactory.parseString(responseBody))
+      assert(config.isSuccess)
+    }
+  }
+
+  it should "return WorkerData" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}")
+      ~> workerRoute) ~> check {
+      val responseBody = responseAs[String]
+      val config = Try(ConfigFactory.parseString(responseBody))
+      assert(config.isSuccess)
+
+      // Check the header, should contains no-cache header.
+      // Cache-Control:no-cache, max-age=0
+      val noCache = header[`Cache-Control`].get.value()
+      assert(noCache == "no-cache, max-age=0")
+    }
+  }
+
+  "MetricsQueryService" should "return history metrics" in {
+    implicit val customTimeout = RouteTestTimeout(15.seconds)
+    (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker")
+      ~> workerRoute) ~> check {
+      val responseBody = responseAs[String]
+      val config = Try(ConfigFactory.parseString(responseBody))
+      assert(config.isSuccess)
+    }
+  }
+
+  override def afterAll {
+    TestKit.shutdownActorSystem(system)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
new file mode 100644
index 0000000..fef581e
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpEntity.Strict
+import akka.http.scaladsl.model.MediaTypes._
+import akka.http.scaladsl.model.Uri.{Path, Query}
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import com.typesafe.config.ConfigFactory
+import org.scalatest.FlatSpec
+
+import org.apache.gearpump.security.Authenticator
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator
+
+class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
+
+  implicit val actorSystem: ActorSystem = system
+  private val server = new MockOAuth2Server(system, null)
+  server.start()
+  private val serverHost = s"http://127.0.0.1:${server.port}"
+
+  val configMap = Map(
+    "class" ->
+      "org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator",
+    "callback" -> s"$serverHost/login/oauth2/cloudfoundryuaa/callback",
+    "clientid" -> "gearpump_test2",
+    "clientsecret" -> "gearpump_test2",
+    "default-userrole" -> "user",
+    "icon" -> "/icons/uaa.png",
+    "uaahost" -> serverHost,
+    "additional-authenticator-enabled" -> "false")
+
+  val configString = ConfigFactory.parseMap(configMap.asJava)
+
+  lazy val uaa = {
+    val uaa = new CloudFoundryUAAOAuth2Authenticator
+    uaa.init(configString, system.dispatcher)
+    uaa
+  }
+
+  it should "generate the correct authorization request" in {
+    val parameters = Uri(uaa.getAuthorizationUrl()).query().toMap
+    assert(parameters("response_type") == "code")
+    assert(parameters("client_id") == configMap("clientid"))
+    assert(parameters("redirect_uri") == configMap("callback"))
+    assert(parameters("scope") == "openid,cloud_controller.read")
+  }
+
+  it should "authenticate the authorization code and return the correct profile" in {
+    val code = Map("code" -> "QGGVeA")
+    val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c"
+    val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
+    val mail = "test@gearpump.io"
+
+    def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
+      assert(request.getHeader("Authorization").get.value() ==
+        "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=")
+      assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
+
+      val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
+      val form = Uri./.withQuery(Query(body)).query().toMap
+
+      assert(form("grant_type") == "authorization_code")
+      assert(form("code") == "QGGVeA")
+      assert(form("response_type") == "token")
+      assert(form("redirect_uri") == configMap("callback"))
+
+      val response =
+        s"""
+        |{
+        |  "access_token": "$accessToken",
+        |  "token_type": "bearer",
+        |  "refresh_token": "$refreshToken",
+        |  "expires_in": 43199,
+        |  "scope": "openid",
+        |  "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9"
+        |}
+        """.stripMargin
+      HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
+    }
+
+    def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
+      assert(request.getUri().query().get("access_token").get == accessToken)
+      val response =
+        s"""
+        |{
+        |    "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c",
+        |    "user_name": "user",
+        |    "email": "$mail"
+        |}
+        """.stripMargin
+      HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
+    }
+
+    server.requestHandler = (request: HttpRequest) => {
+      if (request.uri.path.startsWith(Path("/oauth/token"))) {
+        accessTokenEndpoint(request)
+      } else if (request.uri.path.startsWith(Path("/userinfo"))) {
+        protectedResourceEndpoint(request)
+      } else {
+        fail("Unexpected access to " + request.uri.toString())
+      }
+    }
+
+    val userFuture = uaa.authenticate(code)
+    val user = Await.result(userFuture, 30.seconds)
+    assert(user.user == mail)
+    assert(user.permissionLevel == Authenticator.User.permissionLevel)
+  }
+
+  override def cleanUp(): Unit = {
+    server.stop()
+    uaa.close()
+    super.cleanUp()
+  }
+}