You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:23 UTC
[13/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
deleted file mode 100644
index 59da80d..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.duration._
-import scala.util.{Success, Try}
-
-import akka.actor.ActorRef
-import akka.http.scaladsl.model.headers.`Cache-Control`
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import akka.testkit.TestActor.{AutoPilot, KeepRunning}
-import akka.testkit.{TestKit, TestProbe}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import org.slf4j.Logger
-import upickle.default.read
-
-import io.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary
-import io.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId}
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
-import io.gearpump.cluster.MasterToClient._
-import io.gearpump.cluster.TestUtil
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
-import io.gearpump.util.LogUtil
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
- with Matchers with BeforeAndAfterAll {
-
- override def testConfig: Config = TestUtil.UI_CONFIG
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private def actorRefFactory = system
-
- val mockAppMaster = TestProbe()
- val failure = LastFailure(System.currentTimeMillis(), "Some error")
-
- lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
- def jarStore: JarStoreService = jarStoreService
-
- private def master = mockMaster.ref
-
- private def appMasterRoute = new AppMasterService(master, jarStore, system).route
-
- mockAppMaster.setAutoPilot {
- new AutoPilot {
- def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
- case AppMasterDataDetailRequest(appId) =>
- sender ! GeneralAppMasterSummary(appId)
- KeepRunning
- case QueryHistoryMetrics(path, _, _, _) =>
- sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
- KeepRunning
- case GetLastFailure(appId) =>
- sender ! failure
- KeepRunning
- case GetExecutorSummary(0) =>
- sender ! ExecutorSummary.empty
- KeepRunning
- case QueryExecutorConfig(0) =>
- sender ! ExecutorConfig(system.settings.config)
- KeepRunning
- }
- }
- }
-
- val mockMaster = TestProbe()
- mockMaster.setAutoPilot {
- new AutoPilot {
- def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
- case ResolveAppId(0) =>
- sender ! ResolveAppIdResult(Success(mockAppMaster.ref))
- KeepRunning
- case AppMasterDataRequest(appId, _) =>
- sender ! AppMasterData("active")
- KeepRunning
- case QueryAppMasterConfig(appId) =>
- sender ! AppMasterConfig(null)
- KeepRunning
- }
- }
- }
-
- "AppMasterService" should "return a JSON structure for GET request when detail = false" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check {
- val responseBody = responseAs[String]
- read[AppMasterData](responseBody)
-
- // Checks the header, should contains no-cache header.
- // Cache-Control:no-cache, max-age=0
- val noCache = header[`Cache-Control`].get.value()
- assert(noCache == "no-cache, max-age=0")
- }
-
- Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check {
- val responseBody = responseAs[String]
- }
- }
-
- "MetricsQueryService" should "return history metrics" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/appmaster/0/metrics/processor") ~> appMasterRoute) ~> check {
- val responseBody = responseAs[String]
- val config = Try(ConfigFactory.parseString(responseBody))
- assert(config.isSuccess)
- }
- }
-
- "AppMaster" should "return lastest error" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/appmaster/0/errors") ~> appMasterRoute) ~> check {
- val responseBody = responseAs[String]
- assert(read[LastFailure](responseBody) == failure)
- }
- }
-
- "ConfigQueryService" should "return config for application" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check {
- val responseBody = responseAs[String]
- val config = Try(ConfigFactory.parseString(responseBody))
- assert(config.isSuccess)
- }
- }
-
- it should "return config for executor " in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check {
- val responseBody = responseAs[String]
- val config = Try(ConfigFactory.parseString(responseBody))
- assert(config.isSuccess)
- }
- }
-
- it should "return return executor summary" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check {
- val responseBody = responseAs[String]
- val executorSummary = read[ExecutorSummary](responseBody)
- assert(executorSummary.id == 0)
- }
- }
-
- override def afterAll {
- TestKit.shutdownActorSystem(system)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
deleted file mode 100644
index a24db18..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import java.io.File
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Success, Try}
-
-import akka.actor.ActorRef
-import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model._
-import akka.http.scaladsl.model.headers.`Cache-Control`
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import akka.stream.scaladsl.{FileIO, Source}
-import akka.testkit.TestActor.{AutoPilot, KeepRunning}
-import akka.testkit.TestProbe
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
-import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication}
-import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList}
-import io.gearpump.cluster.MasterToClient._
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-import io.gearpump.streaming.ProcessorDescription
-import io.gearpump.util.Graph
-
-class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
- with Matchers with BeforeAndAfterAll {
- import upickle.default.{read, write}
-
- override def testConfig: Config = TestUtil.UI_CONFIG
-
- private def actorRefFactory = system
- val workerId = 0
- val mockWorker = TestProbe()
-
- lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
- private def master = mockMaster.ref
-
- def jarStore: JarStoreService = jarStoreService
-
- private def masterRoute = new MasterService(master, jarStore, system).route
-
- mockWorker.setAutoPilot {
- new AutoPilot {
- def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
- case GetWorkerData(workerId) =>
- sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId))
- KeepRunning
- case QueryHistoryMetrics(path, _, _, _) =>
- sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
- KeepRunning
- }
- }
- }
-
- val mockMaster = TestProbe()
- mockMaster.setAutoPilot {
- new AutoPilot {
- def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
- case GetMasterData =>
- sender ! MasterData(null)
- KeepRunning
- case AppMastersDataRequest =>
- sender ! AppMastersData(List.empty[AppMasterData])
- KeepRunning
- case GetAllWorkers =>
- sender ! WorkerList(List(WorkerId(0, 0L)))
- KeepRunning
- case ResolveWorkerId(WorkerId(0, 0L)) =>
- sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
- KeepRunning
- case QueryHistoryMetrics(path, _, _, _) =>
- sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
- KeepRunning
- case QueryMasterConfig =>
- sender ! MasterConfig(null)
- KeepRunning
- case submit: SubmitApplication =>
- sender ! SubmitApplicationResult(Success(0))
- KeepRunning
- }
- }
- }
-
- it should "return master info when asked" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check {
- // Checks the type
- val content = responseAs[String]
- read[MasterData](content)
-
- // Checks the header, should contains no-cache header.
- // Cache-Control:no-cache, max-age=0
- val noCache = header[`Cache-Control`].get.value()
- assert(noCache == "no-cache, max-age=0")
- }
-
- mockMaster.expectMsg(GetMasterData)
- }
-
- it should "return a json structure of appMastersData for GET request" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check {
- // Checks the type
- read[AppMastersData](responseAs[String])
- }
- mockMaster.expectMsg(AppMastersDataRequest)
- }
-
- it should "return a json structure of worker data for GET request" in {
- implicit val customTimeout = RouteTestTimeout(25.seconds)
- Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check {
- // Checks the type
- val workerListJson = responseAs[String]
- val workers = read[List[WorkerSummary]](workerListJson)
- assert(workers.size > 0)
- workers.foreach { worker =>
- worker.state shouldBe "active"
- }
- }
- mockMaster.expectMsg(GetAllWorkers)
- mockMaster.expectMsgType[ResolveWorkerId]
- mockWorker.expectMsgType[GetWorkerData]
- }
-
- it should "return config for master" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check {
- val responseBody = responseAs[String]
- val config = Try(ConfigFactory.parseString(responseBody))
- assert(config.isSuccess)
- }
- mockMaster.expectMsg(QueryMasterConfig)
- }
-
- "submit invalid application" should "return an error" in {
- implicit val routeTestTimeout = RouteTestTimeout(30.second)
- val tempfile = new File("foo")
- val request = entity(tempfile)
-
- Post(s"/api/$REST_VERSION/master/submitapp", request) ~> masterRoute ~> check {
- assert(response.status.intValue == 500)
- }
- }
-
- private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
- val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
- FileIO.fromFile(file, chunkSize = 100000))
-
- val body = Source.single(
- Multipart.FormData.BodyPart(
- "file",
- entity,
- Map("filename" -> file.getName)))
- val form = Multipart.FormData(body)
-
- Marshal(form).to[RequestEntity]
- }
-
- "MetricsQueryService" should "return history metrics" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check {
- val responseBody = responseAs[String]
- val config = Try(ConfigFactory.parseString(responseBody))
- assert(config.isSuccess)
- }
- }
-
- "submitDag" should "submit a SubmitApplicationRequest and get an appId > 0" in {
- import io.gearpump.util.Graph._
- val processors = Map(
- 0 -> ProcessorDescription(0, "A", parallelism = 1),
- 1 -> ProcessorDescription(1, "B", parallelism = 1)
- )
- val dag = Graph(0 ~ "partitioner" ~> 1)
- val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null))
- Post(s"/api/$REST_VERSION/master/submitdag",
- HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check {
- val responseBody = responseAs[String]
- val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody)
- assert(submitApplicationResultValue.appId >= 0, "invalid appid")
- }
- }
-
- "MasterService" should "return Gearpump built-in partitioner list" in {
- (Get(s"/api/$REST_VERSION/master/partitioners") ~> masterRoute) ~> check {
- val responseBody = responseAs[String]
- val partitioners = read[BuiltinPartitioners](responseBody)
- assert(partitioners.partitioners.length > 0, "invalid response")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
deleted file mode 100644
index 3cf99e9..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model.FormData
-import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`, _}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server.{AuthorizationFailedRejection, _}
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-class SecurityServiceSpec
- extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
-
- override def testConfig: Config = TestUtil.UI_CONFIG
-
- implicit def actorSystem: ActorSystem = system
-
- it should "return 401 if not authenticated" in {
- val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
-
- implicit val customTimeout = RouteTestTimeout(15.seconds)
-
- (Get(s"/resource") ~> security.route) ~> check {
- assert(rejection.isInstanceOf[AuthenticationFailedRejection])
- }
- }
-
- "guest" should "get protected resource after authentication" in {
- val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
-
- implicit val customTimeout = RouteTestTimeout(15.seconds)
-
- var cookie: HttpCookiePair = null
- (Post(s"/login", FormData("username" -> "guest", "password" -> "guest"))
- ~> security.route) ~> check {
- assert("{\"user\":\"guest\"}" == responseAs[String])
- assert(status.intValue() == 200)
- assert(header[`Set-Cookie`].isDefined)
- val httpCookie = header[`Set-Cookie`].get.cookie
- assert(httpCookie.name == "gearpump_token")
- cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value)
- }
-
- // After authentication, everything is fine.
- Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
- responseAs[String] shouldEqual "OK"
- }
-
- // However, guest cannot access high-permission operations, like POST.
- Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
- assert(rejection == AuthorizationFailedRejection)
- }
-
- // Logout, should clear the session
- Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
- assert("{\"user\":\"guest\"}" == responseAs[String])
- assert(status.intValue() == 200)
- assert(header[`Set-Cookie`].isDefined)
- val httpCookie = header[`Set-Cookie`].get.cookie
- assert(httpCookie.name == "gearpump_token")
- assert(httpCookie.value == "deleted")
- }
-
- // Access again, rejected this time.
- Get("/resource") ~> security.route ~> check {
- assert(rejection.isInstanceOf[AuthenticationFailedRejection])
- }
-
- Post("/resource") ~> security.route ~> check {
- assert(rejection.isInstanceOf[AuthenticationFailedRejection])
- }
- }
-
- "admin" should "get protected resource after authentication" in {
- val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
-
- implicit val customTimeout = RouteTestTimeout(15.seconds)
-
- var cookie: HttpCookiePair = null
- (Post(s"/login", FormData("username" -> "admin", "password" -> "admin"))
- ~> security.route) ~> check {
- assert("{\"user\":\"admin\"}" == responseAs[String])
- assert(status.intValue() == 200)
- assert(header[`Set-Cookie`].isDefined)
- val httpCookie = header[`Set-Cookie`].get.cookie
- assert(httpCookie.name == "gearpump_token")
- cookie = HttpCookiePair(httpCookie.name, httpCookie.value)
- }
-
- // After authentication, everything is fine.
- Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
- responseAs[String] shouldEqual "OK"
- }
-
- // Not like guest, admimn can also access POST
- Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
- responseAs[String] shouldEqual "OK"
- }
-
- // Logout, should clear the session
- Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
- assert("{\"user\":\"admin\"}" == responseAs[String])
- assert(status.intValue() == 200)
- assert(header[`Set-Cookie`].isDefined)
- val httpCookie = header[`Set-Cookie`].get.cookie
- assert(httpCookie.name == "gearpump_token")
- assert(httpCookie.value == "deleted")
- }
-
- // Access again, rejected this time.
- Get("/resource") ~> security.route ~> check {
- assert(rejection.isInstanceOf[AuthenticationFailedRejection])
- }
-
- Post("/resource") ~> security.route ~> check {
- assert(rejection.isInstanceOf[AuthenticationFailedRejection])
- }
- }
-}
-
-object SecurityServiceSpec {
-
- val resource = new RouteService {
- override def route: Route = {
- get {
- path("resource") {
- complete("OK")
- }
- } ~
- post {
- path("resource") {
- complete("OK")
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
deleted file mode 100644
index f61e2f5..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.duration._
-import scala.util.Try
-
-import akka.http.scaladsl.model.headers.`Cache-Control`
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.util.Constants
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-class StaticServiceSpec
- extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
-
- override def testConfig: Config = TestUtil.UI_CONFIG
- private val supervisorPath = system.settings.config.getString(
- Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
-
- protected def route = new StaticService(system, supervisorPath).route
-
- it should "return version" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/version") ~> route) ~> check {
- val responseBody = responseAs[String]
- val config = Try(ConfigFactory.parseString(responseBody))
- assert(responseBody == "Unknown-Version")
-
- // By default, it will be cached.
- assert(header[`Cache-Control`].isEmpty)
- }
- }
-
- it should "get correct supervisor path" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/supervisor-actor-path") ~> route) ~> check {
- val responseBody = responseAs[String]
- val defaultSupervisorPath = ""
- assert(responseBody == defaultSupervisorPath)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
deleted file mode 100644
index 2676a16..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.duration._
-import scala.util.{Success, Try}
-
-import akka.actor.ActorRef
-import akka.http.scaladsl.model.headers.`Cache-Control`
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import akka.testkit.TestActor.{AutoPilot, KeepRunning}
-import akka.testkit.{TestKit, TestProbe}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId}
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import io.gearpump.jarstore.JarStoreService
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-class WorkerServiceSpec
- extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
-
- override def testConfig: Config = TestUtil.DEFAULT_CONFIG
-
- protected def actorRefFactory = system
-
- val mockWorker = TestProbe()
-
- protected def master = mockMaster.ref
-
- lazy val jarStoreService = JarStoreService.get(system.settings.config)
-
- protected def workerRoute = new WorkerService(master, system).route
-
- mockWorker.setAutoPilot {
- new AutoPilot {
- def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
- case GetWorkerData(workerId) =>
- sender ! WorkerData(WorkerSummary.empty)
- KeepRunning
- case QueryWorkerConfig(workerId) =>
- sender ! WorkerConfig(null)
- KeepRunning
- case QueryHistoryMetrics(path, _, _, _) =>
- sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
- KeepRunning
- }
- }
- }
-
- val mockMaster = TestProbe()
- mockMaster.setAutoPilot {
- new AutoPilot {
- def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
- case ResolveWorkerId(workerId) =>
- sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
- KeepRunning
- }
- }
- }
-
- "ConfigQueryService" should "return config for worker" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config")
- ~> workerRoute) ~> check {
- val responseBody = responseAs[String]
- val config = Try(ConfigFactory.parseString(responseBody))
- assert(config.isSuccess)
- }
- }
-
- it should "return WorkerData" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}")
- ~> workerRoute) ~> check {
- val responseBody = responseAs[String]
- val config = Try(ConfigFactory.parseString(responseBody))
- assert(config.isSuccess)
-
- // Check the header, should contains no-cache header.
- // Cache-Control:no-cache, max-age=0
- val noCache = header[`Cache-Control`].get.value()
- assert(noCache == "no-cache, max-age=0")
- }
- }
-
- "MetricsQueryService" should "return history metrics" in {
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker")
- ~> workerRoute) ~> check {
- val responseBody = responseAs[String]
- val config = Try(ConfigFactory.parseString(responseBody))
- assert(config.isSuccess)
- }
- }
-
- override def afterAll {
- TestKit.shutdownActorSystem(system)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
deleted file mode 100644
index 136026a..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model.HttpEntity.Strict
-import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.{Path, Query}
-import akka.http.scaladsl.model._
-import akka.http.scaladsl.testkit.ScalatestRouteTest
-import com.typesafe.config.ConfigFactory
-import org.scalatest.FlatSpec
-
-import io.gearpump.security.Authenticator
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-import io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator
-
-class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
-
- implicit val actorSystem: ActorSystem = system
- private val server = new MockOAuth2Server(system, null)
- server.start()
- private val serverHost = s"http://127.0.0.1:${server.port}"
-
- val configMap = Map(
- "class" -> "io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator",
- "callback" -> s"$serverHost/login/oauth2/cloudfoundryuaa/callback",
- "clientid" -> "gearpump_test2",
- "clientsecret" -> "gearpump_test2",
- "default-userrole" -> "user",
- "icon" -> "/icons/uaa.png",
- "uaahost" -> serverHost,
- "additional-authenticator-enabled" -> "false")
-
- val configString = ConfigFactory.parseMap(configMap.asJava)
-
- lazy val uaa = {
- val uaa = new CloudFoundryUAAOAuth2Authenticator
- uaa.init(configString, system.dispatcher)
- uaa
- }
-
- it should "generate the correct authorization request" in {
- val parameters = Uri(uaa.getAuthorizationUrl()).query().toMap
- assert(parameters("response_type") == "code")
- assert(parameters("client_id") == configMap("clientid"))
- assert(parameters("redirect_uri") == configMap("callback"))
- assert(parameters("scope") == "openid,cloud_controller.read")
- }
-
- it should "authenticate the authorization code and return the correct profile" in {
- val code = Map("code" -> "QGGVeA")
- val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c"
- val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
- val mail = "test@gearpump.io"
-
- def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
- assert(request.getHeader("Authorization").get.value() ==
- "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=")
- assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
-
- val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
- val form = Uri./.withQuery(Query(body)).query().toMap
-
- assert(form("grant_type") == "authorization_code")
- assert(form("code") == "QGGVeA")
- assert(form("response_type") == "token")
- assert(form("redirect_uri") == configMap("callback"))
-
- val response =
- s"""
- |{
- | "access_token": "$accessToken",
- | "token_type": "bearer",
- | "refresh_token": "$refreshToken",
- | "expires_in": 43199,
- | "scope": "openid",
- | "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9"
- |}
- """.stripMargin
- HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
- }
-
- def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
- assert(request.getUri().query().get("access_token").get == accessToken)
- val response =
- s"""
- |{
- | "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c",
- | "user_name": "user",
- | "email": "$mail"
- |}
- """.stripMargin
- HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
- }
-
- server.requestHandler = (request: HttpRequest) => {
- if (request.uri.path.startsWith(Path("/oauth/token"))) {
- accessTokenEndpoint(request)
- } else if (request.uri.path.startsWith(Path("/userinfo"))) {
- protectedResourceEndpoint(request)
- } else {
- fail("Unexpected access to " + request.uri.toString())
- }
- }
-
- val userFuture = uaa.authenticate(code)
- val user = Await.result(userFuture, 30.seconds)
- assert(user.user == mail)
- assert(user.permissionLevel == Authenticator.User.permissionLevel)
- }
-
- override def cleanUp(): Unit = {
- server.stop()
- uaa.close()
- super.cleanUp()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
deleted file mode 100644
index 70d8bb0..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model.HttpEntity.Strict
-import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.{Path, Query}
-import akka.http.scaladsl.model._
-import akka.http.scaladsl.testkit.ScalatestRouteTest
-import com.typesafe.config.ConfigFactory
-import org.scalatest.FlatSpec
-
-import io.gearpump.security.Authenticator
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-import io.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator
-import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator
-
-class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
-
- implicit val actorSystem: ActorSystem = system
- private val server = new MockOAuth2Server(system, null)
- server.start()
- private val serverHost = s"http://127.0.0.1:${server.port}"
-
- val configMap = Map(
- "class" -> "io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator",
- "callback" -> s"$serverHost/login/oauth2/google/callback",
- "clientid" -> "170234147043-a1tag68jtq6ab4bi11jvsj7vbaqcmhkt.apps.googleusercontent.com",
- "clientsecret" -> "ioeWLLDipz2S7aTDXym2-obe",
- "default-userrole" -> "guest",
- "icon" -> "/icons/google.png")
-
- val configString = ConfigFactory.parseMap(configMap.asJava)
-
- private lazy val google = {
- val google = new MockGoogleAuthenticator(serverHost)
- google.init(configString, system.dispatcher)
- google
- }
-
- it should "generate the correct authorization request" in {
- val parameters = Uri(google.getAuthorizationUrl()).query().toMap
- assert(parameters("response_type") == "code")
- assert(parameters("client_id") == configMap("clientid"))
- assert(parameters("redirect_uri") == configMap("callback"))
- assert(parameters("scope") == GoogleOAuth2Authenticator.Scope)
- }
-
- it should "authenticate the authorization code and return the correct profile" in {
- val code = Map("code" -> "4/PME0pfxjiBA42SukR-OTGl7fpFzTWzvZPf1TbkpXL4M#")
- val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c"
- val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
- val mail = "test@gearpump.io"
-
- def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
-
- assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
-
- val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
- val form = Uri./.withQuery(Query(body)).query().toMap
-
- assert(form("client_id") == configMap("clientid"))
- assert(form("client_secret") == configMap("clientsecret"))
- assert(form("grant_type") == "authorization_code")
- assert(form("code") == code("code"))
- assert(form("redirect_uri") == configMap("callback"))
- assert(form("scope") == GoogleOAuth2Authenticator.Scope)
-
- // scalastyle:off line.size.limit
- val response =
- s"""
- |{
- | "access_token": "$accessToken",
- | "token_type": "Bearer",
- | "expires_in": 3591,
- | "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3"
- |}
- """.stripMargin
- // scalastyle:on line.size.limit
-
- HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
- }
-
- def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
- assert(request.getUri().query().get("access_token").get == accessToken)
- val response =
- s"""
- |{
- | "kind": "plus#person",
- | "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8",
- | "nickname": "gearpump",
- | "gender": "female",
- | "emails": [
- | {
- | "value": "$mail",
- | "type": "account"
- | }
- | ]
- | }
- """.stripMargin
- HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
- }
-
- server.requestHandler = (request: HttpRequest) => {
- if (request.uri.path.startsWith(Path("/oauth2/v4/token"))) {
- accessTokenEndpoint(request)
- } else if (request.uri.path.startsWith(Path("/plus/v1/people/me"))) {
- protectedResourceEndpoint(request)
- } else {
- fail("Unexpected access to " + request.uri.toString())
- }
- }
-
- val userFuture = google.authenticate(code)
- val user = Await.result(userFuture, 30.seconds)
- assert(user.user == mail)
- assert(user.permissionLevel == Authenticator.Guest.permissionLevel)
- }
-
- override def cleanUp(): Unit = {
- server.stop()
- google.close()
- super.cleanUp()
- }
-}
-
-object GoogleOAuth2AuthenticatorSpec {
- class MockGoogleAuthenticator(host: String) extends GoogleOAuth2Authenticator {
- protected override def authorizeUrl: String = {
- super.authorizeUrl.replace("https://accounts.google.com", host)
- }
-
- protected override def accessTokenEndpoint: String = {
- super.accessTokenEndpoint.replace("https://www.googleapis.com", host)
- }
-
- protected override def protectedResourceUrl: String = {
- super.protectedResourceUrl.replace("https://www.googleapis.com", host)
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
deleted file mode 100644
index c03532d..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2
-
-import scala.concurrent.{Await, Future}
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.ServerBinding
-import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
-import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.Sink
-
-import io.gearpump.util.Util
-// NOTE: This cannot be removed!!
-import io.gearpump.services.util.UpickleUtil._
-
-/**
- * Serves as a fake OAuth2 server.
- */
-class MockOAuth2Server(
- actorSystem: ActorSystem,
- var requestHandler: HttpRequest => HttpResponse) {
-
- implicit val system: ActorSystem = actorSystem
- implicit val materializer = ActorMaterializer()
- implicit val ec = system.dispatcher
-
- private var _port: Int = 0
- private var bindingFuture: Future[ServerBinding] = null
-
- def port: Int = _port
-
- def start(): Unit = {
- _port = Util.findFreePort().get
-
- val serverSource = Http().bind(interface = "127.0.0.1", port = _port)
- bindingFuture = {
- serverSource.to(Sink.foreach { connection =>
- connection handleWithSyncHandler requestHandler
- }).run()
- }
- }
-
- def stop(): Unit = {
- import scala.concurrent.duration._
- Await.result(bindingFuture.map(_.unbind()), 120.seconds)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
deleted file mode 100644
index b074813..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.util
-
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-import upickle.default.{read, write}
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.metrics.Metrics.{Counter, MetricType}
-import io.gearpump.services.util.UpickleUtil._
-import io.gearpump.streaming.ProcessorId
-import io.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary}
-import io.gearpump.util.Graph
-
-class UpickleSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
-
- "UserConfig" should "serialize and deserialize with upickle correctly" in {
- val conf = UserConfig.empty.withString("key", "value")
- val serialized = write(conf)
- val deserialized = read[UserConfig](serialized)
- assert(deserialized.getString("key") == Some("value"))
- }
-
- "Graph" should "be able to serialize/deserialize correctly" in {
- val graph = new Graph[Int, String](List(0, 1), List((0, "edge", 1)))
- val serialized = write(graph)
-
- val deserialized = read[Graph[Int, String]](serialized)
-
- graph.vertices.toSet shouldBe deserialized.vertices.toSet
- graph.edges.toSet shouldBe deserialized.edges.toSet
- }
-
- "MetricType" should "be able to serialize/deserialize correctly" in {
- val metric: MetricType = Counter("counter", 100L)
- val serialized = write(metric)
- val deserialized = read[MetricType](serialized)
- metric shouldBe deserialized
- }
-
- "StreamingAppMasterDataDetail" should "serialize and deserialize with upickle correctly" in {
- val app = new StreamAppMasterSummary(appId = 0,
- processors = Map.empty[ProcessorId, ProcessorSummary],
- processorLevels = Map.empty[ProcessorId, Int]
- )
-
- val serialized = write(app)
- val deserialized = read[StreamAppMasterSummary](serialized)
- assert(deserialized == app)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala
new file mode 100644
index 0000000..b7f294c
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AdminServiceSpec.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+class AdminServiceSpec
+ extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+
+ override def testConfig: Config = TestUtil.DEFAULT_CONFIG
+
+ implicit def actorSystem: ActorSystem = system
+
+ it should "shutdown the ActorSystem when receiving terminate" in {
+ val route = new AdminService(actorSystem).route
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Post(s"/terminate") ~> route) ~> check {
+ assert(status.intValue() == 404)
+ }
+
+ Await.result(actorSystem.whenTerminated, 20.seconds)
+
+ // terminate should terminate current actor system
+ assert(actorSystem.whenTerminated.isCompleted)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
new file mode 100644
index 0000000..2ece554
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.duration._
+import scala.util.{Success, Try}
+
+import akka.actor.ActorRef
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import akka.testkit.TestActor.{AutoPilot, KeepRunning}
+import akka.testkit.{TestKit, TestProbe}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import org.slf4j.Logger
+import upickle.default.read
+
+import org.apache.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary
+import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
+import org.apache.gearpump.util.LogUtil
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
+ with Matchers with BeforeAndAfterAll {
+
+ override def testConfig: Config = TestUtil.UI_CONFIG
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private def actorRefFactory = system
+
+ val mockAppMaster = TestProbe()
+ val failure = LastFailure(System.currentTimeMillis(), "Some error")
+
+ lazy val jarStoreService = JarStoreService.get(system.settings.config)
+
+ def jarStore: JarStoreService = jarStoreService
+
+ private def master = mockMaster.ref
+
+ private def appMasterRoute = new AppMasterService(master, jarStore, system).route
+
+ mockAppMaster.setAutoPilot {
+ new AutoPilot {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+ case AppMasterDataDetailRequest(appId) =>
+ sender ! GeneralAppMasterSummary(appId)
+ KeepRunning
+ case QueryHistoryMetrics(path, _, _, _) =>
+ sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
+ KeepRunning
+ case GetLastFailure(appId) =>
+ sender ! failure
+ KeepRunning
+ case GetExecutorSummary(0) =>
+ sender ! ExecutorSummary.empty
+ KeepRunning
+ case QueryExecutorConfig(0) =>
+ sender ! ExecutorConfig(system.settings.config)
+ KeepRunning
+ }
+ }
+ }
+
+ val mockMaster = TestProbe()
+ mockMaster.setAutoPilot {
+ new AutoPilot {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+ case ResolveAppId(0) =>
+ sender ! ResolveAppIdResult(Success(mockAppMaster.ref))
+ KeepRunning
+ case AppMasterDataRequest(appId, _) =>
+ sender ! AppMasterData("active")
+ KeepRunning
+ case QueryAppMasterConfig(appId) =>
+ sender ! AppMasterConfig(null)
+ KeepRunning
+ }
+ }
+ }
+
+ "AppMasterService" should "return a JSON structure for GET request when detail = false" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check {
+ val responseBody = responseAs[String]
+ read[AppMasterData](responseBody)
+
+ // Checks the header, should contains no-cache header.
+ // Cache-Control:no-cache, max-age=0
+ val noCache = header[`Cache-Control`].get.value()
+ assert(noCache == "no-cache, max-age=0")
+ }
+
+ Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check {
+ val responseBody = responseAs[String]
+ }
+ }
+
+ "MetricsQueryService" should "return history metrics" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/appmaster/0/metrics/processor") ~> appMasterRoute) ~> check {
+ val responseBody = responseAs[String]
+ val config = Try(ConfigFactory.parseString(responseBody))
+ assert(config.isSuccess)
+ }
+ }
+
+ "AppMaster" should "return lastest error" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/appmaster/0/errors") ~> appMasterRoute) ~> check {
+ val responseBody = responseAs[String]
+ assert(read[LastFailure](responseBody) == failure)
+ }
+ }
+
+ "ConfigQueryService" should "return config for application" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check {
+ val responseBody = responseAs[String]
+ val config = Try(ConfigFactory.parseString(responseBody))
+ assert(config.isSuccess)
+ }
+ }
+
+ it should "return config for executor " in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check {
+ val responseBody = responseAs[String]
+ val config = Try(ConfigFactory.parseString(responseBody))
+ assert(config.isSuccess)
+ }
+ }
+
+ it should "return return executor summary" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check {
+ val responseBody = responseAs[String]
+ val executorSummary = read[ExecutorSummary](responseBody)
+ assert(executorSummary.id == 0)
+ }
+ }
+
+ override def afterAll {
+ TestKit.shutdownActorSystem(system)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
new file mode 100644
index 0000000..e365e9f
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import java.io.File
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Success, Try}
+
+import akka.actor.ActorRef
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import akka.stream.scaladsl.{FileIO, Source}
+import akka.testkit.TestActor.{AutoPilot, KeepRunning}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.ProcessorDescription
+import org.apache.gearpump.util.Graph
+
+class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
+ with Matchers with BeforeAndAfterAll {
+ import upickle.default.{read, write}
+
+ override def testConfig: Config = TestUtil.UI_CONFIG
+
+ private def actorRefFactory = system
+ val workerId = 0
+ val mockWorker = TestProbe()
+
+ lazy val jarStoreService = JarStoreService.get(system.settings.config)
+
+ private def master = mockMaster.ref
+
+ def jarStore: JarStoreService = jarStoreService
+
+ private def masterRoute = new MasterService(master, jarStore, system).route
+
+ mockWorker.setAutoPilot {
+ new AutoPilot {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+ case GetWorkerData(workerId) =>
+ sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId))
+ KeepRunning
+ case QueryHistoryMetrics(path, _, _, _) =>
+ sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
+ KeepRunning
+ }
+ }
+ }
+
+ val mockMaster = TestProbe()
+ mockMaster.setAutoPilot {
+ new AutoPilot {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+ case GetMasterData =>
+ sender ! MasterData(null)
+ KeepRunning
+ case AppMastersDataRequest =>
+ sender ! AppMastersData(List.empty[AppMasterData])
+ KeepRunning
+ case GetAllWorkers =>
+ sender ! WorkerList(List(WorkerId(0, 0L)))
+ KeepRunning
+ case ResolveWorkerId(WorkerId(0, 0L)) =>
+ sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
+ KeepRunning
+ case QueryHistoryMetrics(path, _, _, _) =>
+ sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
+ KeepRunning
+ case QueryMasterConfig =>
+ sender ! MasterConfig(null)
+ KeepRunning
+ case submit: SubmitApplication =>
+ sender ! SubmitApplicationResult(Success(0))
+ KeepRunning
+ }
+ }
+ }
+
+ it should "return master info when asked" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check {
+ // Checks the type
+ val content = responseAs[String]
+ read[MasterData](content)
+
+ // Checks the header, should contains no-cache header.
+ // Cache-Control:no-cache, max-age=0
+ val noCache = header[`Cache-Control`].get.value()
+ assert(noCache == "no-cache, max-age=0")
+ }
+
+ mockMaster.expectMsg(GetMasterData)
+ }
+
+ it should "return a json structure of appMastersData for GET request" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check {
+ // Checks the type
+ read[AppMastersData](responseAs[String])
+ }
+ mockMaster.expectMsg(AppMastersDataRequest)
+ }
+
+ it should "return a json structure of worker data for GET request" in {
+ implicit val customTimeout = RouteTestTimeout(25.seconds)
+ Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check {
+ // Checks the type
+ val workerListJson = responseAs[String]
+ val workers = read[List[WorkerSummary]](workerListJson)
+ assert(workers.size > 0)
+ workers.foreach { worker =>
+ worker.state shouldBe "active"
+ }
+ }
+ mockMaster.expectMsg(GetAllWorkers)
+ mockMaster.expectMsgType[ResolveWorkerId]
+ mockWorker.expectMsgType[GetWorkerData]
+ }
+
+ it should "return config for master" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check {
+ val responseBody = responseAs[String]
+ val config = Try(ConfigFactory.parseString(responseBody))
+ assert(config.isSuccess)
+ }
+ mockMaster.expectMsg(QueryMasterConfig)
+ }
+
+ "submit invalid application" should "return an error" in {
+ implicit val routeTestTimeout = RouteTestTimeout(30.second)
+ val tempfile = new File("foo")
+ val request = entity(tempfile)
+
+ Post(s"/api/$REST_VERSION/master/submitapp", request) ~> masterRoute ~> check {
+ assert(response.status.intValue == 500)
+ }
+ }
+
+ private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
+ val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
+ FileIO.fromFile(file, chunkSize = 100000))
+
+ val body = Source.single(
+ Multipart.FormData.BodyPart(
+ "file",
+ entity,
+ Map("filename" -> file.getName)))
+ val form = Multipart.FormData(body)
+
+ Marshal(form).to[RequestEntity]
+ }
+
+ "MetricsQueryService" should "return history metrics" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check {
+ val responseBody = responseAs[String]
+ val config = Try(ConfigFactory.parseString(responseBody))
+ assert(config.isSuccess)
+ }
+ }
+
+ "submitDag" should "submit a SubmitApplicationRequest and get an appId > 0" in {
+ import org.apache.gearpump.util.Graph._
+ val processors = Map(
+ 0 -> ProcessorDescription(0, "A", parallelism = 1),
+ 1 -> ProcessorDescription(1, "B", parallelism = 1)
+ )
+ val dag = Graph(0 ~ "partitioner" ~> 1)
+ val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null))
+ Post(s"/api/$REST_VERSION/master/submitdag",
+ HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check {
+ val responseBody = responseAs[String]
+ val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody)
+ assert(submitApplicationResultValue.appId >= 0, "invalid appid")
+ }
+ }
+
+ "MasterService" should "return Gearpump built-in partitioner list" in {
+ (Get(s"/api/$REST_VERSION/master/partitioners") ~> masterRoute) ~> check {
+ val responseBody = responseAs[String]
+ val partitioners = read[BuiltinPartitioners](responseBody)
+ assert(partitioners.partitioners.length > 0, "invalid response")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala
new file mode 100644
index 0000000..c8bfb29
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/SecurityServiceSpec.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.FormData
+import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`, _}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.{AuthorizationFailedRejection, _}
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+class SecurityServiceSpec
+ extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+
+ override def testConfig: Config = TestUtil.UI_CONFIG
+
+ implicit def actorSystem: ActorSystem = system
+
+ it should "return 401 if not authenticated" in {
+ val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
+
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+
+ (Get(s"/resource") ~> security.route) ~> check {
+ assert(rejection.isInstanceOf[AuthenticationFailedRejection])
+ }
+ }
+
+ "guest" should "get protected resource after authentication" in {
+ val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
+
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+
+ var cookie: HttpCookiePair = null
+ (Post(s"/login", FormData("username" -> "guest", "password" -> "guest"))
+ ~> security.route) ~> check {
+ assert("{\"user\":\"guest\"}" == responseAs[String])
+ assert(status.intValue() == 200)
+ assert(header[`Set-Cookie`].isDefined)
+ val httpCookie = header[`Set-Cookie`].get.cookie
+ assert(httpCookie.name == "gearpump_token")
+ cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value)
+ }
+
+ // After authentication, everything is fine.
+ Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
+ responseAs[String] shouldEqual "OK"
+ }
+
+ // However, guest cannot access high-permission operations, like POST.
+ Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
+ assert(rejection == AuthorizationFailedRejection)
+ }
+
+ // Logout, should clear the session
+ Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
+ assert("{\"user\":\"guest\"}" == responseAs[String])
+ assert(status.intValue() == 200)
+ assert(header[`Set-Cookie`].isDefined)
+ val httpCookie = header[`Set-Cookie`].get.cookie
+ assert(httpCookie.name == "gearpump_token")
+ assert(httpCookie.value == "deleted")
+ }
+
+ // Access again, rejected this time.
+ Get("/resource") ~> security.route ~> check {
+ assert(rejection.isInstanceOf[AuthenticationFailedRejection])
+ }
+
+ Post("/resource") ~> security.route ~> check {
+ assert(rejection.isInstanceOf[AuthenticationFailedRejection])
+ }
+ }
+
+ "admin" should "get protected resource after authentication" in {
+ val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
+
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+
+ var cookie: HttpCookiePair = null
+ (Post(s"/login", FormData("username" -> "admin", "password" -> "admin"))
+ ~> security.route) ~> check {
+ assert("{\"user\":\"admin\"}" == responseAs[String])
+ assert(status.intValue() == 200)
+ assert(header[`Set-Cookie`].isDefined)
+ val httpCookie = header[`Set-Cookie`].get.cookie
+ assert(httpCookie.name == "gearpump_token")
+ cookie = HttpCookiePair(httpCookie.name, httpCookie.value)
+ }
+
+ // After authentication, everything is fine.
+ Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
+ responseAs[String] shouldEqual "OK"
+ }
+
+ // Not like guest, admimn can also access POST
+ Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
+ responseAs[String] shouldEqual "OK"
+ }
+
+ // Logout, should clear the session
+ Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
+ assert("{\"user\":\"admin\"}" == responseAs[String])
+ assert(status.intValue() == 200)
+ assert(header[`Set-Cookie`].isDefined)
+ val httpCookie = header[`Set-Cookie`].get.cookie
+ assert(httpCookie.name == "gearpump_token")
+ assert(httpCookie.value == "deleted")
+ }
+
+ // Access again, rejected this time.
+ Get("/resource") ~> security.route ~> check {
+ assert(rejection.isInstanceOf[AuthenticationFailedRejection])
+ }
+
+ Post("/resource") ~> security.route ~> check {
+ assert(rejection.isInstanceOf[AuthenticationFailedRejection])
+ }
+ }
+}
+
+object SecurityServiceSpec {
+
+ val resource = new RouteService {
+ override def route: Route = {
+ get {
+ path("resource") {
+ complete("OK")
+ }
+ } ~
+ post {
+ path("resource") {
+ complete("OK")
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala
new file mode 100644
index 0000000..33f0866
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/StaticServiceSpec.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.duration._
+import scala.util.Try
+
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.util.Constants
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+class StaticServiceSpec
+ extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+
+ override def testConfig: Config = TestUtil.UI_CONFIG
+ private val supervisorPath = system.settings.config.getString(
+ Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
+
+ protected def route = new StaticService(system, supervisorPath).route
+
+ it should "return version" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/version") ~> route) ~> check {
+ val responseBody = responseAs[String]
+ val config = Try(ConfigFactory.parseString(responseBody))
+ assert(responseBody == "Unknown-Version")
+
+ // By default, it will be cached.
+ assert(header[`Cache-Control`].isEmpty)
+ }
+ }
+
+ it should "get correct supervisor path" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/supervisor-actor-path") ~> route) ~> check {
+ val responseBody = responseAs[String]
+ val defaultSupervisorPath = ""
+ assert(responseBody == defaultSupervisorPath)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
new file mode 100644
index 0000000..4658c98
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/WorkerServiceSpec.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.duration._
+import scala.util.{Success, Try}
+
+import akka.actor.ActorRef
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import akka.testkit.TestActor.{AutoPilot, KeepRunning}
+import akka.testkit.{TestKit, TestProbe}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import org.apache.gearpump.jarstore.JarStoreService
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+class WorkerServiceSpec
+ extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+
+ override def testConfig: Config = TestUtil.DEFAULT_CONFIG
+
+ protected def actorRefFactory = system
+
+ val mockWorker = TestProbe()
+
+ protected def master = mockMaster.ref
+
+ lazy val jarStoreService = JarStoreService.get(system.settings.config)
+
+ protected def workerRoute = new WorkerService(master, system).route
+
+ mockWorker.setAutoPilot {
+ new AutoPilot {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+ case GetWorkerData(workerId) =>
+ sender ! WorkerData(WorkerSummary.empty)
+ KeepRunning
+ case QueryWorkerConfig(workerId) =>
+ sender ! WorkerConfig(null)
+ KeepRunning
+ case QueryHistoryMetrics(path, _, _, _) =>
+ sender ! HistoryMetrics(path, List.empty[HistoryMetricsItem])
+ KeepRunning
+ }
+ }
+ }
+
+ val mockMaster = TestProbe()
+ mockMaster.setAutoPilot {
+ new AutoPilot {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
+ case ResolveWorkerId(workerId) =>
+ sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
+ KeepRunning
+ }
+ }
+ }
+
+ "ConfigQueryService" should "return config for worker" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config")
+ ~> workerRoute) ~> check {
+ val responseBody = responseAs[String]
+ val config = Try(ConfigFactory.parseString(responseBody))
+ assert(config.isSuccess)
+ }
+ }
+
+ it should "return WorkerData" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}")
+ ~> workerRoute) ~> check {
+ val responseBody = responseAs[String]
+ val config = Try(ConfigFactory.parseString(responseBody))
+ assert(config.isSuccess)
+
+ // Check the header, should contains no-cache header.
+ // Cache-Control:no-cache, max-age=0
+ val noCache = header[`Cache-Control`].get.value()
+ assert(noCache == "no-cache, max-age=0")
+ }
+ }
+
+ "MetricsQueryService" should "return history metrics" in {
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker")
+ ~> workerRoute) ~> check {
+ val responseBody = responseAs[String]
+ val config = Try(ConfigFactory.parseString(responseBody))
+ assert(config.isSuccess)
+ }
+ }
+
+ override def afterAll {
+ TestKit.shutdownActorSystem(system)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
new file mode 100644
index 0000000..fef581e
--- /dev/null
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpEntity.Strict
+import akka.http.scaladsl.model.MediaTypes._
+import akka.http.scaladsl.model.Uri.{Path, Query}
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import com.typesafe.config.ConfigFactory
+import org.scalatest.FlatSpec
+
+import org.apache.gearpump.security.Authenticator
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator
+
+class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
+
+ implicit val actorSystem: ActorSystem = system
+ private val server = new MockOAuth2Server(system, null)
+ server.start()
+ private val serverHost = s"http://127.0.0.1:${server.port}"
+
+ val configMap = Map(
+ "class" ->
+ "org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator",
+ "callback" -> s"$serverHost/login/oauth2/cloudfoundryuaa/callback",
+ "clientid" -> "gearpump_test2",
+ "clientsecret" -> "gearpump_test2",
+ "default-userrole" -> "user",
+ "icon" -> "/icons/uaa.png",
+ "uaahost" -> serverHost,
+ "additional-authenticator-enabled" -> "false")
+
+ val configString = ConfigFactory.parseMap(configMap.asJava)
+
+ lazy val uaa = {
+ val uaa = new CloudFoundryUAAOAuth2Authenticator
+ uaa.init(configString, system.dispatcher)
+ uaa
+ }
+
+ it should "generate the correct authorization request" in {
+ val parameters = Uri(uaa.getAuthorizationUrl()).query().toMap
+ assert(parameters("response_type") == "code")
+ assert(parameters("client_id") == configMap("clientid"))
+ assert(parameters("redirect_uri") == configMap("callback"))
+ assert(parameters("scope") == "openid,cloud_controller.read")
+ }
+
+ it should "authenticate the authorization code and return the correct profile" in {
+ val code = Map("code" -> "QGGVeA")
+ val accessToken = "e2922002-0218-4513-a62d-1da2ba64ee4c"
+ val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
+ val mail = "test@gearpump.io"
+
+ def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
+ assert(request.getHeader("Authorization").get.value() ==
+ "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=")
+ assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
+
+ val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
+ val form = Uri./.withQuery(Query(body)).query().toMap
+
+ assert(form("grant_type") == "authorization_code")
+ assert(form("code") == "QGGVeA")
+ assert(form("response_type") == "token")
+ assert(form("redirect_uri") == configMap("callback"))
+
+ val response =
+ s"""
+ |{
+ | "access_token": "$accessToken",
+ | "token_type": "bearer",
+ | "refresh_token": "$refreshToken",
+ | "expires_in": 43199,
+ | "scope": "openid",
+ | "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9"
+ |}
+ """.stripMargin
+ HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
+ }
+
+ def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
+ assert(request.getUri().query().get("access_token").get == accessToken)
+ val response =
+ s"""
+ |{
+ | "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c",
+ | "user_name": "user",
+ | "email": "$mail"
+ |}
+ """.stripMargin
+ HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
+ }
+
+ server.requestHandler = (request: HttpRequest) => {
+ if (request.uri.path.startsWith(Path("/oauth/token"))) {
+ accessTokenEndpoint(request)
+ } else if (request.uri.path.startsWith(Path("/userinfo"))) {
+ protectedResourceEndpoint(request)
+ } else {
+ fail("Unexpected access to " + request.uri.toString())
+ }
+ }
+
+ val userFuture = uaa.authenticate(code)
+ val user = Await.result(userFuture, 30.seconds)
+ assert(user.user == mail)
+ assert(user.permissionLevel == Authenticator.User.permissionLevel)
+ }
+
+ override def cleanUp(): Unit = {
+ server.stop()
+ uaa.close()
+ super.cleanUp()
+ }
+}