You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:37 UTC
[27/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
index 738bbad..e67731e 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,21 @@
package io.gearpump.services
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
import akka.actor.ActorSystem
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
import io.gearpump.cluster.TestUtil
-import akka.http.scaladsl.testkit.{ScalatestRouteTest, RouteTestTimeout}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
-import io.gearpump.services.util.UpickleUtil._
-import scala.concurrent.duration._
-import scala.util.Try
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
-class AdminServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+class AdminServiceSpec
+ extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
override def testConfig: Config = TestUtil.DEFAULT_CONFIG
@@ -36,14 +40,14 @@ class AdminServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers w
it should "shutdown the ActorSystem when receiving terminate" in {
val route = new AdminService(actorSystem).route
- implicit val customTimeout = RouteTestTimeout(15 seconds)
- (Post(s"/terminate") ~> route) ~> check{
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Post(s"/terminate") ~> route) ~> check {
assert(status.intValue() == 404)
}
- actorSystem.awaitTermination(20 seconds)
+ Await.result(actorSystem.whenTerminated, 20.seconds)
// terminate should terminate current actor system
- assert(actorSystem.isTerminated)
+ assert(actorSystem.whenTerminated.isCompleted)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
index 7b98cb6..59da80d 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,13 +18,19 @@
package io.gearpump.services
+import scala.concurrent.duration._
+import scala.util.{Success, Try}
+
import akka.actor.ActorRef
import akka.http.scaladsl.model.headers.`Cache-Control`
-import akka.http.scaladsl.server.RouteResult
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
import akka.testkit.TestActor.{AutoPilot, KeepRunning}
import akka.testkit.{TestKit, TestProbe}
import com.typesafe.config.{Config, ConfigFactory}
-import akka.http.scaladsl.testkit.RouteTestTimeout
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import org.slf4j.Logger
+import upickle.default.read
+
import io.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary
import io.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId}
import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
@@ -33,13 +39,8 @@ import io.gearpump.cluster.TestUtil
import io.gearpump.jarstore.JarStoreService
import io.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
import io.gearpump.util.LogUtil
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import org.slf4j.Logger
-import upickle.default.read
-import akka.http.scaladsl.testkit.ScalatestRouteTest
+// NOTE: This cannot be removed!!!
import io.gearpump.services.util.UpickleUtil._
-import scala.concurrent.duration._
-import scala.util.{Success, Try}
class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
with Matchers with BeforeAndAfterAll {
@@ -47,7 +48,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
override def testConfig: Config = TestUtil.UI_CONFIG
private val LOG: Logger = LogUtil.getLogger(getClass)
- def actorRefFactory = system
+ private def actorRefFactory = system
val mockAppMaster = TestProbe()
val failure = LastFailure(System.currentTimeMillis(), "Some error")
@@ -56,13 +57,13 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
def jarStore: JarStoreService = jarStoreService
- def master = mockMaster.ref
+ private def master = mockMaster.ref
- def appMasterRoute = new AppMasterService(master, jarStore, system).route
+ private def appMasterRoute = new AppMasterService(master, jarStore, system).route
mockAppMaster.setAutoPilot {
new AutoPilot {
- def run(sender: ActorRef, msg: Any) = msg match {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case AppMasterDataDetailRequest(appId) =>
sender ! GeneralAppMasterSummary(appId)
KeepRunning
@@ -78,7 +79,6 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
case QueryExecutorConfig(0) =>
sender ! ExecutorConfig(system.settings.config)
KeepRunning
-
}
}
}
@@ -86,7 +86,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
val mockMaster = TestProbe()
mockMaster.setAutoPilot {
new AutoPilot {
- def run(sender: ActorRef, msg: Any) = msg match {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case ResolveAppId(0) =>
sender ! ResolveAppIdResult(Success(mockAppMaster.ref))
KeepRunning
@@ -102,20 +102,19 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
"AppMasterService" should "return a JSON structure for GET request when detail = false" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check{
+ Get(s"/api/$REST_VERSION/appmaster/0?detail=false") ~> appMasterRoute ~> check {
val responseBody = responseAs[String]
read[AppMasterData](responseBody)
- // check the header, should contains no-cache header.
+ // Checks the header, should contains no-cache header.
// Cache-Control:no-cache, max-age=0
val noCache = header[`Cache-Control`].get.value()
assert(noCache == "no-cache, max-age=0")
}
- Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check{
+ Get(s"/api/$REST_VERSION/appmaster/0?detail=true") ~> appMasterRoute ~> check {
val responseBody = responseAs[String]
}
-
}
"MetricsQueryService" should "return history metrics" in {
@@ -137,7 +136,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
"ConfigQueryService" should "return config for application" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check{
+ (Get(s"/api/$REST_VERSION/appmaster/0/config") ~> appMasterRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
@@ -146,7 +145,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
it should "return config for executor " in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check{
+ (Get(s"/api/$REST_VERSION/appmaster/0/executor/0/config") ~> appMasterRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
@@ -155,14 +154,13 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest
it should "return return executor summary" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check{
+ (Get(s"/api/$REST_VERSION/appmaster/0/executor/0") ~> appMasterRoute) ~> check {
val responseBody = responseAs[String]
val executorSummary = read[ExecutorSummary](responseBody)
assert(executorSummary.id == 0)
}
}
-
override def afterAll {
TestKit.shutdownActorSystem(system)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
index 1cfc01a..a24db18 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,55 +19,55 @@
package io.gearpump.services
import java.io.File
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Success, Try}
import akka.actor.ActorRef
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
-import akka.http.scaladsl.model.headers.{`Cache-Control`, `Set-Cookie`}
-import akka.stream.scaladsl.Source
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import akka.stream.scaladsl.{FileIO, Source}
import akka.testkit.TestActor.{AutoPilot, KeepRunning}
import akka.testkit.TestProbe
import com.typesafe.config.{Config, ConfigFactory}
-import io.gearpump.WorkerId
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication}
import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList}
import io.gearpump.cluster.MasterToClient._
import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.worker.WorkerSummary
-import io.gearpump.services.MasterService.{SubmitApplicationRequest, BuiltinPartitioners}
+import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
import io.gearpump.jarstore.JarStoreService
-import io.gearpump.streaming.ProcessorDescription
-import io.gearpump.util.{Constants, Graph}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import scala.concurrent.{Future, ExecutionContext}
-import scala.concurrent.duration._
-import scala.util.{Success, Try}
-import akka.stream.scaladsl.FileIO
+import io.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
+// NOTE: This cannot be removed!!!
import io.gearpump.services.util.UpickleUtil._
+import io.gearpump.streaming.ProcessorDescription
+import io.gearpump.util.Graph
-class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
- Matchers with BeforeAndAfterAll {
+class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
+ with Matchers with BeforeAndAfterAll {
import upickle.default.{read, write}
override def testConfig: Config = TestUtil.UI_CONFIG
- def actorRefFactory = system
+ private def actorRefFactory = system
val workerId = 0
val mockWorker = TestProbe()
lazy val jarStoreService = JarStoreService.get(system.settings.config)
- def master = mockMaster.ref
+ private def master = mockMaster.ref
def jarStore: JarStoreService = jarStoreService
- def masterRoute = new MasterService(master, jarStore, system).route
+ private def masterRoute = new MasterService(master, jarStore, system).route
mockWorker.setAutoPilot {
new AutoPilot {
- def run(sender: ActorRef, msg: Any) = msg match {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case GetWorkerData(workerId) =>
sender ! WorkerData(WorkerSummary.empty.copy(state = "active", workerId = workerId))
KeepRunning
@@ -81,11 +81,11 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
val mockMaster = TestProbe()
mockMaster.setAutoPilot {
new AutoPilot {
- def run(sender: ActorRef, msg: Any) = msg match {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case GetMasterData =>
sender ! MasterData(null)
KeepRunning
- case AppMastersDataRequest =>
+ case AppMastersDataRequest =>
sender ! AppMastersData(List.empty[AppMasterData])
KeepRunning
case GetAllWorkers =>
@@ -107,15 +107,14 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
}
}
-
it should "return master info when asked" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/master") ~> masterRoute) ~> check {
- // check the type
+ // Checks the type
val content = responseAs[String]
read[MasterData](content)
- // check the header, should contains no-cache header.
+ // Checks the header, should contains no-cache header.
// Cache-Control:no-cache, max-age=0
val noCache = header[`Cache-Control`].get.value()
assert(noCache == "no-cache, max-age=0")
@@ -127,7 +126,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
it should "return a json structure of appMastersData for GET request" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
(Get(s"/api/$REST_VERSION/master/applist") ~> masterRoute) ~> check {
- //check the type
+ // Checks the type
read[AppMastersData](responseAs[String])
}
mockMaster.expectMsg(AppMastersDataRequest)
@@ -136,7 +135,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
it should "return a json structure of worker data for GET request" in {
implicit val customTimeout = RouteTestTimeout(25.seconds)
Get(s"/api/$REST_VERSION/master/workerlist") ~> masterRoute ~> check {
- //check the type
+ // Checks the type
val workerListJson = responseAs[String]
val workers = read[List[WorkerSummary]](workerListJson)
assert(workers.size > 0)
@@ -151,7 +150,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
it should "return config for master" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check{
+ (Get(s"/api/$REST_VERSION/master/config") ~> masterRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
@@ -170,7 +169,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
}
private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
- val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
+ val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
FileIO.fromFile(file, chunkSize = 100000))
val body = Source.single(
@@ -185,7 +184,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
"MetricsQueryService" should "return history metrics" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute)~> check {
+ (Get(s"/api/$REST_VERSION/master/metrics/master") ~> masterRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
@@ -200,7 +199,8 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with
)
val dag = Graph(0 ~ "partitioner" ~> 1)
val jsonValue = write(SubmitApplicationRequest("complexdag", processors, dag, null))
- Post(s"/api/$REST_VERSION/master/submitdag", HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check {
+ Post(s"/api/$REST_VERSION/master/submitdag",
+ HttpEntity(ContentTypes.`application/json`, jsonValue)) ~> masterRoute ~> check {
val responseBody = responseAs[String]
val submitApplicationResultValue = read[SubmitApplicationResultValue](responseBody)
assert(submitApplicationResultValue.appId >= 0, "invalid appid")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
index 10b45b9..3cf99e9 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,23 +18,23 @@
package io.gearpump.services
-import akka.http.scaladsl.testkit.{RouteTestTimeout}
-import com.typesafe.config.Config
-import io.gearpump.cluster.TestUtil
-import io.gearpump.services.util.UpickleUtil._
-import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
-import akka.actor.{ActorSystem}
-import akka.http.scaladsl.server._
import scala.concurrent.duration._
-import akka.http.scaladsl.model._
-import headers._
+
+import akka.actor.ActorSystem
import akka.http.scaladsl.model.FormData
-import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`}
-import akka.http.scaladsl.server.AuthorizationFailedRejection
+import akka.http.scaladsl.model.headers.{Cookie, `Set-Cookie`, _}
import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.testkit.ScalatestRouteTest
+import akka.http.scaladsl.server.{AuthorizationFailedRejection, _}
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
+import com.typesafe.config.Config
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+import io.gearpump.cluster.TestUtil
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
+
+class SecurityServiceSpec
+ extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
override def testConfig: Config = TestUtil.UI_CONFIG
@@ -43,9 +43,9 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher
it should "return 401 if not authenticated" in {
val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
- implicit val customTimeout = RouteTestTimeout(15 seconds)
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/resource") ~> security.route) ~> check{
+ (Get(s"/resource") ~> security.route) ~> check {
assert(rejection.isInstanceOf[AuthenticationFailedRejection])
}
}
@@ -53,10 +53,11 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher
"guest" should "get protected resource after authentication" in {
val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
- implicit val customTimeout = RouteTestTimeout(15 seconds)
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
var cookie: HttpCookiePair = null
- (Post(s"/login", FormData("username" -> "guest", "password" -> "guest")) ~> security.route) ~> check{
+ (Post(s"/login", FormData("username" -> "guest", "password" -> "guest"))
+ ~> security.route) ~> check {
assert("{\"user\":\"guest\"}" == responseAs[String])
assert(status.intValue() == 200)
assert(header[`Set-Cookie`].isDefined)
@@ -65,18 +66,18 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher
cookie = HttpCookiePair.apply(httpCookie.name, httpCookie.value)
}
- // after authentication, everything is fine.
+ // After authentication, everything is fine.
Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
responseAs[String] shouldEqual "OK"
}
- // however, guest cannot access high-permission operations, like POST.
- Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
+ // However, guest cannot access high-permission operations, like POST.
+ Post("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
assert(rejection == AuthorizationFailedRejection)
}
- // logout, should clear the session
- Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{
+ // Logout, should clear the session
+ Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
assert("{\"user\":\"guest\"}" == responseAs[String])
assert(status.intValue() == 200)
assert(header[`Set-Cookie`].isDefined)
@@ -85,12 +86,12 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher
assert(httpCookie.value == "deleted")
}
- // access again, rejected.
- Get("/resource") ~> security.route ~> check {
+ // Access again, rejected this time.
+ Get("/resource") ~> security.route ~> check {
assert(rejection.isInstanceOf[AuthenticationFailedRejection])
}
- Post("/resource") ~> security.route ~> check {
+ Post("/resource") ~> security.route ~> check {
assert(rejection.isInstanceOf[AuthenticationFailedRejection])
}
}
@@ -98,10 +99,11 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher
"admin" should "get protected resource after authentication" in {
val security = new SecurityService(SecurityServiceSpec.resource, actorSystem)
- implicit val customTimeout = RouteTestTimeout(15 seconds)
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
var cookie: HttpCookiePair = null
- (Post(s"/login", FormData("username" -> "admin", "password" -> "admin")) ~> security.route) ~> check{
+ (Post(s"/login", FormData("username" -> "admin", "password" -> "admin"))
+ ~> security.route) ~> check {
assert("{\"user\":\"admin\"}" == responseAs[String])
assert(status.intValue() == 200)
assert(header[`Set-Cookie`].isDefined)
@@ -110,7 +112,7 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher
cookie = HttpCookiePair(httpCookie.name, httpCookie.value)
}
- // after authentication, everything is fine.
+ // After authentication, everything is fine.
Get("/resource").addHeader(Cookie(cookie)) ~> security.route ~> check {
responseAs[String] shouldEqual "OK"
}
@@ -120,8 +122,8 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher
responseAs[String] shouldEqual "OK"
}
- // logout, should clear the session
- Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check{
+ // Logout, should clear the session
+ Post(s"/logout").addHeader(Cookie(cookie)) ~> security.route ~> check {
assert("{\"user\":\"admin\"}" == responseAs[String])
assert(status.intValue() == 200)
assert(header[`Set-Cookie`].isDefined)
@@ -130,8 +132,8 @@ class SecurityServiceSpec extends FlatSpec with ScalatestRouteTest with Matcher
assert(httpCookie.value == "deleted")
}
- // access again, rejected.
- Get("/resource") ~> security.route ~> check {
+ // Access again, rejected this time.
+ Get("/resource") ~> security.route ~> check {
assert(rejection.isInstanceOf[AuthenticationFailedRejection])
}
@@ -145,7 +147,7 @@ object SecurityServiceSpec {
val resource = new RouteService {
override def route: Route = {
- get{
+ get {
path("resource") {
complete("OK")
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
index 501776e..f61e2f5 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,41 +18,43 @@
package io.gearpump.services
+import scala.concurrent.duration._
+import scala.util.Try
+
import akka.http.scaladsl.model.headers.`Cache-Control`
import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import akka.testkit.TestProbe
import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
import io.gearpump.cluster.TestUtil
import io.gearpump.util.Constants
+// NOTE: This cannot be removed!!!
import io.gearpump.services.util.UpickleUtil._
-import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
-import scala.util.Try
-import scala.concurrent.duration._
-
-class StaticServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+class StaticServiceSpec
+ extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
override def testConfig: Config = TestUtil.UI_CONFIG
- private val supervisorPath = system.settings.config.getString(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
-
+ private val supervisorPath = system.settings.config.getString(
+ Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
- def route = new StaticService(system, supervisorPath).route
+ protected def route = new StaticService(system, supervisorPath).route
it should "return version" in {
- implicit val customTimeout = RouteTestTimeout(15 seconds)
- (Get(s"/version") ~> route) ~> check{
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/version") ~> route) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(responseBody == "Unknown-Version")
- // by default, it will be cached.
+ // By default, it will be cached.
assert(header[`Cache-Control`].isEmpty)
}
}
it should "get correct supervisor path" in {
- implicit val customTimeout = RouteTestTimeout(15 seconds)
- (Get(s"/supervisor-actor-path") ~> route) ~> check{
+ implicit val customTimeout = RouteTestTimeout(15.seconds)
+ (Get(s"/supervisor-actor-path") ~> route) ~> check {
val responseBody = responseAs[String]
val defaultSupervisorPath = ""
assert(responseBody == defaultSupervisorPath)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
index dc8d5a7..2676a16 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,42 +18,44 @@
package io.gearpump.services
+import scala.concurrent.duration._
+import scala.util.{Success, Try}
+
import akka.actor.ActorRef
import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
import akka.testkit.TestActor.{AutoPilot, KeepRunning}
import akka.testkit.{TestKit, TestProbe}
import com.typesafe.config.{Config, ConfigFactory}
-import io.gearpump.WorkerId
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId}
import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig}
import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.worker.WorkerSummary
+import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
import io.gearpump.jarstore.JarStoreService
+// NOTE: This cannot be removed!!!
import io.gearpump.services.util.UpickleUtil._
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import scala.concurrent.duration._
-import scala.util.{Success, Try}
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
+class WorkerServiceSpec
+ extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
override def testConfig: Config = TestUtil.DEFAULT_CONFIG
- def actorRefFactory = system
+ protected def actorRefFactory = system
val mockWorker = TestProbe()
- def master = mockMaster.ref
+ protected def master = mockMaster.ref
lazy val jarStoreService = JarStoreService.get(system.settings.config)
- def workerRoute = new WorkerService(master, system).route
+ protected def workerRoute = new WorkerService(master, system).route
mockWorker.setAutoPilot {
new AutoPilot {
- def run(sender: ActorRef, msg: Any) = msg match {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case GetWorkerData(workerId) =>
sender ! WorkerData(WorkerSummary.empty)
KeepRunning
@@ -70,7 +72,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers
val mockMaster = TestProbe()
mockMaster.setAutoPilot {
new AutoPilot {
- def run(sender: ActorRef, msg: Any) = msg match {
+ def run(sender: ActorRef, msg: Any): AutoPilot = msg match {
case ResolveWorkerId(workerId) =>
sender ! ResolveWorkerIdResult(Success(mockWorker.ref))
KeepRunning
@@ -78,10 +80,10 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers
}
}
-
"ConfigQueryService" should "return config for worker" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config") ~> workerRoute) ~> check{
+ (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config")
+ ~> workerRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
@@ -90,12 +92,13 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers
it should "return WorkerData" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}") ~> workerRoute) ~> check{
+ (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}")
+ ~> workerRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
- // check the header, should contains no-cache header.
+ // Check the header, should contains no-cache header.
// Cache-Control:no-cache, max-age=0
val noCache = header[`Cache-Control`].get.value()
assert(noCache == "no-cache, max-age=0")
@@ -104,7 +107,8 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers
"MetricsQueryService" should "return history metrics" in {
implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker") ~> workerRoute) ~> check {
+ (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker")
+ ~> workerRoute) ~> check {
val responseBody = responseAs[String]
val config = Try(ConfigFactory.parseString(responseBody))
assert(config.isSuccess)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
index 68a3506..136026a 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/CloudFoundryUAAOAuth2AuthenticatorSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,23 @@
package io.gearpump.services.security.oauth2
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpEntity.Strict
import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.{Query, Path}
+import akka.http.scaladsl.model.Uri.{Path, Query}
import akka.http.scaladsl.model._
import akka.http.scaladsl.testkit.ScalatestRouteTest
import com.typesafe.config.ConfigFactory
-import io.gearpump.security.Authenticator
-import io.gearpump.services.security.oauth2.impl.{CloudFoundryUAAOAuth2Authenticator, GoogleOAuth2Authenticator}
import org.scalatest.FlatSpec
-import scala.concurrent.Await
-import scala.concurrent.duration._
-import scala.collection.JavaConverters._
+
+import io.gearpump.security.Authenticator
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
+import io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator
class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
@@ -71,8 +75,9 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
val mail = "test@gearpump.io"
- def accessTokenEndpoint(request: HttpRequest) = {
- assert(request.getHeader("Authorization").get.value() == "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=")
+ def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
+ assert(request.getHeader("Authorization").get.value() ==
+ "Basic Z2VhcnB1bXBfdGVzdDI6Z2VhcnB1bXBfdGVzdDI=")
assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
val body = request.entity.asInstanceOf[Strict].data.decodeString("UTF-8")
@@ -85,27 +90,27 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
val response =
s"""
- |{
- | "access_token": "$accessToken",
- | "token_type": "bearer",
- | "refresh_token": "$refreshToken",
- | "expires_in": 43199,
- | "scope": "openid",
- | "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9"
- |}
+ |{
+ | "access_token": "$accessToken",
+ | "token_type": "bearer",
+ | "refresh_token": "$refreshToken",
+ | "expires_in": 43199,
+ | "scope": "openid",
+ | "jti": "e8739474-b2fa-42eb-a9ad-e065bf79d7e9"
+ |}
""".stripMargin
HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
}
- def protectedResourceEndpoint(request: HttpRequest) = {
+ def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
assert(request.getUri().query().get("access_token").get == accessToken)
val response =
s"""
- |{
- | "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c",
- | "user_name": "user",
- | "email": "$mail"
- |}
+ |{
+ | "user_id": "e2922002-0218-4513-a62d-1da2ba64ee4c",
+ | "user_name": "user",
+ | "email": "$mail"
+ |}
""".stripMargin
HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
}
@@ -121,7 +126,7 @@ class CloudFoundryUAAOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRout
}
val userFuture = uaa.authenticate(code)
- val user = Await.result(userFuture, 30 seconds)
+ val user = Await.result(userFuture, 30.seconds)
assert(user.user == mail)
assert(user.permissionLevel == Authenticator.User.permissionLevel)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
index 58b4a34..70d8bb0 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/GoogleOAuth2AuthenticatorSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,21 +18,24 @@
package io.gearpump.services.security.oauth2
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpEntity.Strict
import akka.http.scaladsl.model.MediaTypes._
-import akka.http.scaladsl.model.Uri.{Query, Path}
+import akka.http.scaladsl.model.Uri.{Path, Query}
import akka.http.scaladsl.model._
import akka.http.scaladsl.testkit.ScalatestRouteTest
import com.typesafe.config.ConfigFactory
-import io.gearpump.security.Authenticator
-import io.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator
-import io.gearpump.services.security.oauth2.impl.{GoogleOAuth2Authenticator, CloudFoundryUAAOAuth2Authenticator}
import org.scalatest.FlatSpec
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration._
+import io.gearpump.security.Authenticator
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
+import io.gearpump.services.security.oauth2.GoogleOAuth2AuthenticatorSpec.MockGoogleAuthenticator
+import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator
class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
@@ -71,7 +74,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
val refreshToken = "eyJhbGciOiJSUzI1NiJ9.eyJqdGkiOiI2Nm"
val mail = "test@gearpump.io"
- def accessTokenEndpoint(request: HttpRequest) = {
+ def accessTokenEndpoint(request: HttpRequest): HttpResponse = {
assert(request.entity.contentType.mediaType.value == "application/x-www-form-urlencoded")
@@ -85,33 +88,37 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
assert(form("redirect_uri") == configMap("callback"))
assert(form("scope") == GoogleOAuth2Authenticator.Scope)
- val response = s"""
- |{
- | "access_token": "$accessToken",
- | "token_type": "Bearer",
- | "expires_in": 3591,
- | "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3"
- |}
+ // scalastyle:off line.size.limit
+ val response =
+ s"""
+ |{
+ | "access_token": "$accessToken",
+ | "token_type": "Bearer",
+ | "expires_in": 3591,
+ | "id_token": "eyJhbGciOiJSUzI1NiIsImtpZCI6ImY1NjQyYzY2MzdhYWQyOTJiOThlOGIwN2MwMzIxN2QwMzBmOTdkODkifQ.eyJpc3"
+ |}
""".stripMargin
+ // scalastyle:on line.size.limit
HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
}
- def protectedResourceEndpoint(request: HttpRequest) = {
+ def protectedResourceEndpoint(request: HttpRequest): HttpResponse = {
assert(request.getUri().query().get("access_token").get == accessToken)
- val response =s"""
- |{
- | "kind": "plus#person",
- | "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8",
- | "nickname": "gearpump",
- | "gender": "female",
- | "emails": [
- | {
- | "value": "$mail",
- | "type": "account"
- | }
- | ]
- | }
+ val response =
+ s"""
+ |{
+ | "kind": "plus#person",
+ | "etag": "4OZ_Kt6ujOh1jaML_U6RM6APqoE/mZ57HcMOYXaNXYXS5XEGJ9yVsI8",
+ | "nickname": "gearpump",
+ | "gender": "female",
+ | "emails": [
+ | {
+ | "value": "$mail",
+ | "type": "account"
+ | }
+ | ]
+ | }
""".stripMargin
HttpResponse(entity = HttpEntity(ContentType(`application/json`), response))
}
@@ -127,7 +134,7 @@ class GoogleOAuth2AuthenticatorSpec extends FlatSpec with ScalatestRouteTest {
}
val userFuture = google.authenticate(code)
- val user = Await.result(userFuture, 30 seconds)
+ val user = Await.result(userFuture, 30.seconds)
assert(user.user == mail)
assert(user.permissionLevel == Authenticator.Guest.permissionLevel)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
index 9b6b5e9..c03532d 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/security/oauth2/MockOAuth2Server.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +18,18 @@
package io.gearpump.services.security.oauth2
-import akka.actor.{ActorRef, ActorSystem}
+import scala.concurrent.{Await, Future}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
-import akka.http.scaladsl.model.{HttpResponse, HttpRequest}
-import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.ActorMaterializer
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.server.Directives._
-import akka.stream.scaladsl.{Sink, Source}
-import io.gearpump.util.Util
-import akka.pattern.ask
+import akka.stream.scaladsl.Sink
-import scala.concurrent.{Await, Future}
+import io.gearpump.util.Util
+// NOTE: This cannot be removed!!
+import io.gearpump.services.util.UpickleUtil._
/**
* Serves as a fake OAuth2 server.
@@ -48,12 +48,11 @@ class MockOAuth2Server(
def port: Int = _port
def start(): Unit = {
- _port = Util.findFreePort.get
+ _port = Util.findFreePort().get
val serverSource = Http().bind(interface = "127.0.0.1", port = _port)
bindingFuture = {
serverSource.to(Sink.foreach { connection =>
- println("Accepted new connection from " + connection.remoteAddress)
connection handleWithSyncHandler requestHandler
}).run()
}
@@ -61,6 +60,6 @@ class MockOAuth2Server(
def stop(): Unit = {
import scala.concurrent.duration._
- Await.result(bindingFuture.map(_.unbind()), 120 seconds)
+ Await.result(bindingFuture.map(_.unbind()), 120.seconds)
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
index 9b3069e..b074813 100644
--- a/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
+++ b/services/jvm/src/test/scala/io/gearpump/services/util/UpickleSpec.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
package io.gearpump.services.util
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+import upickle.default.{read, write}
+
import io.gearpump.cluster.UserConfig
import io.gearpump.metrics.Metrics.{Counter, MetricType}
import io.gearpump.services.util.UpickleUtil._
import io.gearpump.streaming.ProcessorId
import io.gearpump.streaming.appmaster.{ProcessorSummary, StreamAppMasterSummary}
import io.gearpump.util.Graph
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-import upickle.default.{read, write}
class UpickleSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/README.md
----------------------------------------------------------------------
diff --git a/streaming/README.md b/streaming/README.md
deleted file mode 100644
index 86d4ec2..0000000
--- a/streaming/README.md
+++ /dev/null
@@ -1,116 +0,0 @@
-A very basic DSL which support flatmap, reduce, and etc..
-======================
-
-
-Supported operators:
-------------------
-```scala
-class Stream[T](dag: Graph[Op, OpEdge], private val thisNode: Op, private val edge: Option[OpEdge] = None) {
-
- /**
- * convert a value[T] to a list of value[R]
- * @param fun
- * @tparam R
- * @return
- */
- def flatMap[R](fun: T => TraversableOnce[R]): Stream[R]
-
- /**
- * convert value[T] to value[R]
- * @param fun
- * @tparam R
- * @return
- */
- def map[R](fun: T => R): Stream[R]
-
- /**
- * reserve records when fun(T) == true
- * @param fun
- * @return
- */
- def filter(fun: T => Boolean): Stream[T]
-
- /**
- * Reduce opeartion
- * @param fun
- * @return
- */
- def reduce(fun: (T, T) => T): Stream[T]
-
- /**
- * Log to task log file
- */
- def log(): Unit
-
- /**
- * Merge data from two stream into one
- * @param other
- * @return
- */
- def merge(other: Stream[T]): Stream[T]
-
- /**
- * Group by fun(T)
- *
- * For example, we have T type, People(name: String, gender: String, age: Int)
- * groupBy[People](_.gender) will group the people by gender.
- *
- * You can append other combinators after groupBy
- *
- * For example,
- *
- * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
- *
- * @param fun
- * @param parallism
- * @tparam Group
- * @return
- */
- def groupBy[Group](fun: T => Group, parallism: Int = 1): Stream[T]
-
- /**
- * connect with a low level Processor(TaskDescription)
- * @param processor
- * @param parallism
- * @tparam R
- * @return
- */
- def process[R](processor: Class[_ <: Task], parallism: Int): Stream[R]
-}
-
-```
-
-How to define the DSL
----------------
-WordCount:
-
-```scala
-val context = ClientContext(master)
- val app = StreamApp("dsl", context)
-
- val data = "This is a good start, bingo!! bingo!!"
- app.fromCollection(data.lines.toList).
- // word => (word, count)
- flatMap(line => line.split("[\\s]+")).map((_, 1)).
- // (word, count1), (word, count2) => (word, count1 + count2)
- groupBy(kv => kv._1).reduce((left, right) => (left._1, left._2 + right._2))
-
- val appId = context.submit(app)
- context.close()
-```
-
-For the full example, please check https://github.com/intel-hadoop/gearpump/tree/master/experiments/dsl/src/main/scala/io.gearpump/streaming/dsl/example
-
-
-Run an example
----------------------
-```bash
-# start master
-bin\local
-
-# start UI
-bin\services
-
-# start example topology
-bin\gear io.gearpump.streaming.dsl.example.WordCount
-```
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
index 8b81253..dbae1c4 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/Graph.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,8 +19,15 @@
package io.gearpump.streaming.javaapi;
import io.gearpump.partitioner.Partitioner;
+import io.gearpump.streaming.Processor;
+import io.gearpump.streaming.task.Task;
-public class Graph extends io.gearpump.util.Graph<io.gearpump.streaming.Processor<? extends io.gearpump.streaming.task.Task>, Partitioner> {
+/**
+ * Java version of Graph
+ *
+ * See {@link io.gearpump.util.Graph}
+ */
+public class Graph extends io.gearpump.util.Graph<Processor<? extends Task>, Partitioner> {
public Graph() {
super(null, null);
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
index 52b5d61..974183e 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/Processor.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,6 +27,11 @@ import io.gearpump.streaming.source.DataSource;
import io.gearpump.streaming.source.DataSourceProcessor;
import io.gearpump.streaming.source.DataSourceTask;
+/**
+ * Java version of Processor
+ *
+ * See {@link io.gearpump.streaming.Processor}
+ */
public class Processor<T extends io.gearpump.streaming.task.Task> implements io.gearpump.streaming.Processor<T> {
private Class<T> _taskClass;
private int _parallelism = 1;
@@ -43,46 +48,49 @@ public class Processor<T extends io.gearpump.streaming.task.Task> implements io.
}
/**
- * Create a Sink Processor
- * @param dataSink the data sink itself
- * @param parallelism the parallelism of this processor
- * @param description the description for this processor
- * @param taskConf the configuration for this processor
- * @param system actor system
- * @return the new created sink processor
+ * Creates a Sink Processor
+ *
+ * @param dataSink the data sink itself
+ * @param parallelism the parallelism of this processor
+ * @param description the description for this processor
+ * @param taskConf the configuration for this processor
+ * @param system actor system
+ * @return the new created sink processor
*/
- public static Processor<DataSinkTask> sink(DataSink dataSink, int parallelism, String description, UserConfig taskConf, ActorSystem system) {
+ public static Processor<DataSinkTask> sink(DataSink dataSink, int parallelism, String description, UserConfig taskConf, ActorSystem system) {
io.gearpump.streaming.Processor<DataSinkTask> p = DataSinkProcessor.apply(dataSink, parallelism, description, taskConf, system);
return new Processor(p);
}
/**
- * Create a Source Processor
- * @param source the data source itself
- * @param parallelism the parallelism of this processor
- * @param description the description of this processor
- * @param taskConf the configuration of this processor
- * @param system actor system
- * @return the new created source processor
+ * Creates a Source Processor
+ *
+ * @param source the data source itself
+ * @param parallelism the parallelism of this processor
+ * @param description the description of this processor
+ * @param taskConf the configuration of this processor
+ * @param system actor system
+ * @return the new created source processor
*/
- public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) {
+ public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) {
io.gearpump.streaming.Processor<DataSourceTask> p = DataSourceProcessor.apply(source, parallelism, description, taskConf, system);
return new Processor(p);
}
public Processor(io.gearpump.streaming.Processor<T> processor) {
- this._taskClass = (Class)(processor.taskClass());
+ this._taskClass = (Class) (processor.taskClass());
this._parallelism = processor.parallelism();
this._description = processor.description();
this._userConf = processor.taskConf();
}
/**
- * Create a general processor with user specified task logic.
- * @param taskClass task implementation class of this processor (shall be a derived class from {@link Task}
+ * Creates a general processor with user specified task logic.
+ *
+ * @param taskClass task implementation class of this processor (shall be a derived class from {@link Task}
* @param parallelism, how many initial tasks you want to use
* @param description, some text to describe this processor
- * @param taskConf, Processor specific configuration
+ * @param taskConf, Processor specific configuration
*/
public Processor(Class<T> taskClass, int parallelism, String description, UserConfig taskConf) {
this._taskClass = taskClass;
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
index d017793..150a26f 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/StreamApplication.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package io.gearpump.streaming.javaapi;
import akka.actor.ActorSystem;
@@ -24,21 +23,25 @@ import io.gearpump.cluster.Application;
import io.gearpump.cluster.ApplicationMaster;
import io.gearpump.cluster.UserConfig;
-
+/**
+ * Java version of StreamApplication.
+ *
+ * Also see {@link io.gearpump.streaming.StreamApplication}
+ */
public class StreamApplication implements Application {
private io.gearpump.streaming.StreamApplication app;
/**
- * Create a streaming application
- * @param name name of the application
- * @param conf user configuration
- * @param graph the DAG
+ * Creates a streaming application
*
+ * @param name Name of the application
+ * @param conf User configuration
+ * @param graph The DAG
*/
public StreamApplication(String name, UserConfig conf, Graph graph) {
//by pass the tricky type check in scala 2.10
io.gearpump.util.Graph untypedGraph = graph;
this.app = io.gearpump.streaming.StreamApplication.apply(
- name, untypedGraph, conf);
+ name, untypedGraph, conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
index 836287d..45fae19 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/Task.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -24,6 +24,11 @@ import io.gearpump.cluster.UserConfig;
import io.gearpump.streaming.task.StartTime;
import io.gearpump.streaming.task.TaskContext;
+/**
+ * Java version of Task.
+ *
+ * See {@link io.gearpump.streaming.task.Task}
+ */
public class Task extends io.gearpump.streaming.task.Task {
protected TaskContext context;
protected UserConfig userConf;
@@ -40,11 +45,14 @@ public class Task extends io.gearpump.streaming.task.Task {
}
@Override
- public void onStart(StartTime startTime){}
+ public void onStart(StartTime startTime) {
+ }
@Override
- public void onNext(Message msg){}
+ public void onNext(Message msg) {
+ }
@Override
- public void onStop(){}
+ public void onStop() {
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
index e4e137f..bb97442 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,9 @@ package io.gearpump.streaming.javaapi.dsl.functions;
import java.io.Serializable;
/**
- * a function that decides whether to reserve a value<T>
+ * Filter function
+ *
+ * @param <T> Message of type T
*/
public interface FilterFunction<T> extends Serializable {
boolean apply(T t);
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
index b65a338..3e18cf1 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,7 +22,10 @@ import java.io.Serializable;
import java.util.Iterator;
/**
- * a function that converts a value<T> to a iterator of value<R>
+ * Function that converts a value of type T to a iterator of values of type R.
+ *
+ * @param <T> Input value type
+ * @param <R> Return value type
*/
public interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> apply(T t);
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
index 651c477..2ba524e 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,10 @@ package io.gearpump.streaming.javaapi.dsl.functions;
import java.io.Serializable;
/**
- * a function that puts a value<T> into a Group
+ * GroupBy function which assign value of type T to groups
+ *
+ * @param <T> Input value type
+ * @param <Group> Group Type
*/
public interface GroupByFunction<T, Group> extends Serializable {
Group apply(T t);
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
index a30a671..b4bd6ac 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,10 @@ package io.gearpump.streaming.javaapi.dsl.functions;
import java.io.Serializable;
/**
- * a function that converts a value<T> to value<R>
+ * Function that map a value of type T to value of type R
+ *
+ * @param <T> Input value type
+ * @param <R> Output value type
*/
public interface MapFunction<T, R> extends Serializable {
R apply(T t);
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
index 0f4bb18..f439c0a 100644
--- a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
+++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,9 @@ package io.gearpump.streaming.javaapi.dsl.functions;
import java.io.Serializable;
/**
- * a function that applies reduce operation
+ * Function that applies reduce operation
+ *
+ * @param <T> Input value type
*/
public interface ReduceFunction<T> extends Serializable {
T apply(T t1, T t2);
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
index 370b36c..06cf022 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/ClusterMessage.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,29 +18,34 @@
package io.gearpump.streaming
+import scala.language.existentials
+
import akka.actor.ActorRef
-import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
-import io.gearpump.streaming.task.{TaskId, Subscriber}
+
import io.gearpump.TimeStamp
import io.gearpump.cluster.appmaster.WorkerInfo
import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
+import io.gearpump.streaming.task.{Subscriber, TaskId}
import io.gearpump.transport.HostPort
-import scala.language.existentials
-
object AppMasterToExecutor {
- case class LaunchTasks(taskId: List[TaskId], dagVersion: Int, processorDescription: ProcessorDescription, subscribers: List[Subscriber])
+ case class LaunchTasks(
+ taskId: List[TaskId], dagVersion: Int, processorDescription: ProcessorDescription,
+ subscribers: List[Subscriber])
case object TasksLaunched
/**
* dagVersion, life, and subscribers will be changed on target task list.
*/
- case class ChangeTasks(taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
+ case class ChangeTasks(
+ taskId: List[TaskId], dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
case class TasksChanged(taskIds: List[TaskId])
- case class ChangeTask(taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
+ case class ChangeTask(
+ taskId: TaskId, dagVersion: Int, life: LifeTime, subscribers: List[Subscriber])
case class TaskChanged(taskId: TaskId, dagVersion: Int)
@@ -52,7 +57,8 @@ object AppMasterToExecutor {
case class TaskLocationsReceived(dagVersion: Int, executorId: ExecutorId)
- case class TaskLocationsRejected(dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable)
+ case class TaskLocationsRejected(
+ dagVersion: Int, executorId: ExecutorId, reason: String, ex: Throwable)
case class StartAllTasks(dagVersion: Int)
@@ -65,10 +71,11 @@ object AppMasterToExecutor {
}
object ExecutorToAppMaster {
- case class RegisterExecutor(executor: ActorRef, executorId: Int, resource: Resource, worker : WorkerInfo)
+ case class RegisterExecutor(
+ executor: ActorRef, executorId: Int, resource: Resource, worker : WorkerInfo)
- case class RegisterTask(taskId: TaskId, executorId : Int, task: HostPort)
- case class UnRegisterTask(taskId: TaskId, executorId : Int)
+ case class RegisterTask(taskId: TaskId, executorId: Int, task: HostPort)
+ case class UnRegisterTask(taskId: TaskId, executorId: Int)
case class MessageLoss(executorId: Int, taskId: TaskId, cause: String)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/Constants.scala b/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
index fa23807..becf31a 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/Constants.scala
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package io.gearpump.streaming
object Constants {
@@ -11,6 +29,9 @@ object Constants {
val GEARPUMP_STREAMING_REGISTER_TASK_TIMEOUT_MS = "gearpump.streaming.register-task-timeout-ms"
- val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT = "gearpump.streaming.max-pending-message-count-per-connection"
- val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT = "gearpump.streaming.ack-once-every-message-count"
+ val GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT =
+ "gearpump.streaming.max-pending-message-count-per-connection"
+
+ val GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT =
+ "gearpump.streaming.ack-once-every-message-count"
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/DAG.scala b/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
index a6b8cba..a73fd48 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/DAG.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,16 @@
package io.gearpump.streaming
-import io.gearpump.streaming.task.TaskId
import io.gearpump.partitioner.PartitionerDescription
+import io.gearpump.streaming.task.TaskId
import io.gearpump.util.Graph
/**
- * DAG is wrapper for [[Graph]] for streaming applications.
+ * DAG is wrapper for [[io.gearpump.util.Graph]] for streaming applications.
*/
-case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription], graph : Graph[ProcessorId, PartitionerDescription]) extends Serializable {
+case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription],
+ graph : Graph[ProcessorId, PartitionerDescription])
+ extends Serializable {
def isEmpty: Boolean = {
processors.isEmpty
@@ -46,15 +48,15 @@ case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription]
}
object DAG {
- def apply (graph : Graph[ProcessorDescription, PartitionerDescription], version: Int = 0) : DAG = {
- val processors = graph.vertices.map{processorDescription =>
+ def apply(graph: Graph[ProcessorDescription, PartitionerDescription], version: Int = 0): DAG = {
+ val processors = graph.vertices.map { processorDescription =>
(processorDescription.id, processorDescription)
}.toMap
- val dag = graph.mapVertex{ processor =>
+ val dag = graph.mapVertex { processor =>
processor.id
}
new DAG(version, processors, dag)
}
- def empty() = apply(Graph.empty[ProcessorDescription, PartitionerDescription])
+ def empty: DAG = apply(Graph.empty[ProcessorDescription, PartitionerDescription])
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala b/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
index 4eabcd2..8eb866d 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/MessageSerializer.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -75,7 +75,7 @@ class InitialAckRequestSerializer extends TaskMessageSerializer[InitialAckReques
}
}
-class AckRequestSerializer extends TaskMessageSerializer[AckRequest]{
+class AckRequestSerializer extends TaskMessageSerializer[AckRequest] {
val taskIdSerializer = new TaskIdSerializer
override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 6
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
index b68054a..0ca4d92 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,20 @@
package io.gearpump.streaming
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
import akka.actor.ActorSystem
-import io.gearpump.streaming.appmaster.AppMaster
-import io.gearpump.streaming.task.Task
+
import io.gearpump.TimeStamp
import io.gearpump.cluster._
import io.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject}
-import io.gearpump.util.{LogUtil, Graph, ReferenceEqual}
-
-import scala.language.implicitConversions
-import scala.reflect.ClassTag
+import io.gearpump.streaming.appmaster.AppMaster
+import io.gearpump.streaming.task.Task
+import io.gearpump.util.{Graph, LogUtil, ReferenceEqual}
/**
* Processor is the blueprint for tasks.
- *
*/
trait Processor[+T <: Task] extends ReferenceEqual {
@@ -41,7 +41,7 @@ trait Processor[+T <: Task] extends ReferenceEqual {
def parallelism: Int
/**
- * The custom [[UserConfig]], it is used to initialize a task in runtime.
+ * The custom [[io.gearpump.cluster.UserConfig]], it is used to initialize a task in runtime.
*/
def taskConf: UserConfig
@@ -59,20 +59,29 @@ trait Processor[+T <: Task] extends ReferenceEqual {
}
object Processor {
- def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ <: Task]): ProcessorDescription = {
+ def ProcessorToProcessorDescription(id: ProcessorId, processor: Processor[_ <: Task])
+ : ProcessorDescription = {
import processor._
ProcessorDescription(id, taskClass.getName, parallelism, description, taskConf)
}
- def apply[T<: Task](parallelism : Int, description: String = "", taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T]): DefaultProcessor[T] = {
- new DefaultProcessor[T](parallelism, description, taskConf, classtag.runtimeClass.asInstanceOf[Class[T]])
+ def apply[T<: Task](
+ parallelism : Int, description: String = "",
+ taskConf: UserConfig = UserConfig.empty)(implicit classtag: ClassTag[T])
+ : DefaultProcessor[T] = {
+ new DefaultProcessor[T](parallelism, description, taskConf,
+ classtag.runtimeClass.asInstanceOf[Class[T]])
}
- def apply[T<: Task](taskClazz: Class[T], parallelism : Int, description: String, taskConf: UserConfig): DefaultProcessor[T] = {
+ def apply[T<: Task](
+ taskClazz: Class[T], parallelism : Int, description: String, taskConf: UserConfig)
+ : DefaultProcessor[T] = {
new DefaultProcessor[T](parallelism, description, taskConf, taskClazz)
}
- case class DefaultProcessor[T<: Task](parallelism : Int, description: String, taskConf: UserConfig, taskClass: Class[T]) extends Processor[T] {
+ case class DefaultProcessor[T<: Task](
+ parallelism : Int, description: String, taskConf: UserConfig, taskClass: Class[T])
+ extends Processor[T] {
def withParallelism(parallel: Int): DefaultProcessor[T] = {
new DefaultProcessor[T](parallel, description, taskConf, taskClass)
@@ -93,7 +102,6 @@ object Processor {
*
* When input message's timestamp is beyond current processor's lifetime,
* then it will not be processed by this processor.
- *
*/
case class LifeTime(birth: TimeStamp, death: TimeStamp) {
def contains(timestamp: TimeStamp): Boolean = {
@@ -112,7 +120,9 @@ object LifeTime {
/**
* Represent a streaming application
*/
-class StreamApplication(override val name : String, val inputUserConfig: UserConfig, val dag: Graph[ProcessorDescription, PartitionerDescription])
+class StreamApplication(
+ override val name: String, val inputUserConfig: UserConfig,
+ val dag: Graph[ProcessorDescription, PartitionerDescription])
extends Application {
require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")
@@ -137,19 +147,21 @@ object StreamApplication {
private val hashPartitioner = new HashPartitioner()
private val LOG = LogUtil.getLogger(getClass)
- def apply[T <: Processor[Task], P <: Partitioner] (name : String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = {
- import Processor._
+ def apply[T <: Processor[Task], P <: Partitioner](
+ name: String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = {
+ import io.gearpump.streaming.Processor._
if (dag.hasCycle()) {
LOG.warn(s"Detected cycles in DAG of application $name!")
}
val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap
- val graph = dag.mapVertex {processor =>
+ val graph = dag.mapVertex { processor =>
val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
updatedProcessor
}.mapEdge { (node1, edge, node2) =>
- PartitionerDescription(new PartitionerObject(Option(edge).getOrElse(StreamApplication.hashPartitioner)))
+ PartitionerDescription(new PartitionerObject(
+ Option(edge).getOrElse(StreamApplication.hashPartitioner)))
}
new StreamApplication(name, userConfig, graph)
}