You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:25 UTC
[15/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
deleted file mode 100644
index d0fea30..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.http.scaladsl.model.StatusCodes._
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server.{Route, _}
-import akka.stream.ActorMaterializer
-import akka.util.Timeout
-import org.apache.commons.lang.exception.ExceptionUtils
-
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.util.{Constants, LogUtil}
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-/** Contains all REST API service endpoints */
-class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem)
- extends RouteService {
-
- implicit val timeout = Constants.FUTURE_TIMEOUT
-
- private val config = system.settings.config
-
- private val jarStoreService = JarStoreService.get(config)
- jarStoreService.init(config, system)
-
- private val LOG = LogUtil.getLogger(getClass)
-
- private val securityEnabled = config.getBoolean(
- Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED)
-
- private val supervisorPath = system.settings.config.getString(
- Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
-
- private val myExceptionHandler: ExceptionHandler = ExceptionHandler {
- case ex: Throwable => {
- extractUri { uri =>
- LOG.error(s"Request to $uri could not be handled normally", ex)
- complete(InternalServerError, ExceptionUtils.getStackTrace(ex))
- }
- }
- }
-
- // Makes sure staticRoute is the final one, as it will try to lookup resource in local path
- // if there is no match in previous routes
- private val static = new StaticService(system, supervisorPath).route
-
- def supervisor: ActorRef = {
- if (supervisorPath == null || supervisorPath.isEmpty()) {
- null
- } else {
- val actorRef = system.actorSelection(supervisorPath).resolveOne()
- Await.result(actorRef, new Timeout(Duration.create(5, "seconds")).duration)
- }
- }
-
- override def route: Route = {
- if (securityEnabled) {
- val security = new SecurityService(services, system)
- handleExceptions(myExceptionHandler) {
- security.route ~ static
- }
- } else {
- handleExceptions(myExceptionHandler) {
- services.route ~ static
- }
- }
- }
-
- private def services: RouteService = {
-
- val admin = new AdminService(system)
- val masterService = new MasterService(master, jarStoreService, system)
- val worker = new WorkerService(master, system)
- val app = new AppMasterService(master, jarStoreService, system)
- val sup = new SupervisorService(master, supervisor, system)
-
- new RouteService {
- override def route: Route = {
- admin.route ~ sup.route ~ masterService.route ~ worker.route ~ app.route
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
deleted file mode 100644
index b1abf62..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model.headers.{HttpChallenge, HttpCookie, HttpCookiePair}
-import akka.http.scaladsl.model.{RemoteAddress, StatusCodes, Uri}
-import akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsMissing, CredentialsRejected}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
-import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet
-import akka.stream.Materializer
-import com.typesafe.config.Config
-import com.softwaremill.session.SessionDirectives._
-import com.softwaremill.session.SessionOptions._
-import com.softwaremill.session.{MultiValueSessionSerializer, SessionConfig, SessionManager}
-import upickle.default.write
-
-import io.gearpump.security.{Authenticator => BaseAuthenticator}
-import io.gearpump.services.SecurityService.{User, UserSession}
-import io.gearpump.services.security.oauth2.OAuth2Authenticator
-import io.gearpump.util.{Constants, LogUtil}
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-/**
- * Security authentication endpoint.
- *
- * - When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection
- * - When user can be authenticated, but are not authorized to access certail resource, will
- * return a 405 AuthorizationFailedRejection.
- * - When web UI frontend receive 401, it should redirect the UI to login page.
- * - When web UI receive 405,it should display errors like
- * "current user is not authorized to access this resource."
- *
- * The Authenticator used is pluggable, the current Authenticator is resolved by looking up
- * config path [[io.gearpump.util.Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]].
- *
- * See [[io.gearpump.security.Authenticator]] to find more info on custom Authenticator.
- */
-class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService {
-
- // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box.
- private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump",
- params = Map.empty)
-
- val LOG = LogUtil.getLogger(getClass, "AUDIT")
-
- private val config = system.settings.config
- private val sessionConfig = SessionConfig.fromConfig(config)
- private implicit val sessionManager: SessionManager[UserSession] =
- new SessionManager[UserSession](sessionConfig)
-
- private val authenticator = {
- val clazz = Class.forName(config.getString(Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS))
- val constructor = clazz.getConstructor(classOf[Config])
- val authenticator = constructor.newInstance(config).asInstanceOf[BaseAuthenticator]
- authenticator
- }
-
- private def configToMap(config: Config, path: String) = {
- import scala.collection.JavaConverters._
- config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
- }
-
- private val oauth2Providers: Map[String, String] = {
- if (config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)) {
- val map = configToMap(config, Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS)
- map.keys.toList.map { key =>
- val iconPath = config.getString(s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$key.icon")
- (key, iconPath)
- }.toMap
- } else {
- Map.empty[String, String]
- }
- }
-
- private def authenticate(user: String, pass: String)(implicit ec: ExecutionContext)
- : Future[Option[UserSession]] = {
- authenticator.authenticate(user, pass, ec).map { result =>
- if (result.authenticated) {
- Some(UserSession(user, result.permissionLevel))
- } else {
- None
- }
- }
- }
-
- private def rejectMissingCredentials: Route = {
- reject(AuthenticationFailedRejection(CredentialsMissing, challenge))
- }
-
- private def rejectWrongCredentials: Route = {
- reject(AuthenticationFailedRejection(CredentialsRejected, challenge))
- }
-
- private def requireAuthentication(inner: UserSession => Route): Route = {
- optionalSession(oneOff, usingCookiesOrHeaders) { sessionOption =>
- sessionOption match {
- case Some(session) => {
- inner(session)
- }
- case None =>
- rejectMissingCredentials
- }
- }
- }
-
- private def login(session: UserSession, ip: String, redirectToRoot: Boolean = false): Route = {
- setSession(oneOff, usingCookies, session) {
- val user = session.user
- // Default: 1 day
- val maxAgeMs = 1000 * sessionConfig.sessionMaxAgeSeconds.getOrElse(24 * 3600L)
- setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = Some("/"),
- maxAge = Some(maxAgeMs))) {
- LOG.info(s"user $user login from $ip")
- if (redirectToRoot) {
- redirect(Uri("/"), StatusCodes.TemporaryRedirect)
- } else {
- complete(write(new User(user)))
- }
- }
- }
- }
-
- private def logout(user: UserSession, ip: String): Route = {
- invalidateSession(oneOff, usingCookies) { ctx =>
- LOG.info(s"user ${user.user} logout from $ip")
- ctx.complete(write(new User(user.user)))
- }
- }
-
- // Only admin are able to access operation like post/delete/put
- private def requireAuthorization(user: UserSession, route: => Route): Route = {
- // Valid user
- if (user.permissionLevel >= BaseAuthenticator.User.permissionLevel) {
- route
- } else {
- // Possibly a guest or not authenticated.
- (put | delete | post) {
- // Reject with 405 authorization error
- reject(AuthorizationFailedRejection)
- } ~
- get {
- route
- }
- }
- }
-
- private val unknownIp: Directive1[RemoteAddress] = {
- Directive[Tuple1[RemoteAddress]]{ inner =>
- inner(new Tuple1(RemoteAddress.Unknown))
- }
- }
-
- override val route: Route = {
-
- extractExecutionContext{implicit ec: ExecutionContext =>
- extractMaterializer{implicit mat: Materializer =>
- (extractClientIP | unknownIp) { ip =>
- pathPrefix("login") {
- pathEndOrSingleSlash {
- get {
- getFromResource("login/login.html")
- } ~
- post {
- // Guest account don't have permission to submit new application in UI
- formField(FieldMagnet('username.as[String])) {user: String =>
- formFields(FieldMagnet('password.as[String])) {pass: String =>
- val result = authenticate(user, pass)
- onSuccess(result) {
- case Some(session) =>
- login(session, ip.toString)
- case None =>
- rejectWrongCredentials
- }
- }
- }
- }
- } ~
- path ("oauth2" / "providers") {
- // Responds with a list of OAuth2 providers.
- complete(write(oauth2Providers))
- } ~
- // Support OAUTH Authentication
- pathPrefix ("oauth2"/ Segment) {providerName =>
- // Resolve OAUTH Authentication Provider
- val oauthService = OAuth2Authenticator.get(config, providerName, ec)
-
- if (oauthService == null) {
- // OAuth2 is disabled.
- complete(StatusCodes.NotFound)
- } else {
-
- def loginWithOAuth2Parameters(parameters: Map[String, String]): Route = {
- val result = oauthService.authenticate(parameters)
- onComplete(result) {
- case Success(session) =>
- login(session, ip.toString, redirectToRoot = true)
- case Failure(ex) => {
- LOG.info(s"Failed to login user from ${ip.toString}", ex)
- rejectWrongCredentials
- }
- }
- }
-
- path ("authorize") {
- // Redirects to OAuth2 service provider for authorization.
- redirect(Uri(oauthService.getAuthorizationUrl), StatusCodes.TemporaryRedirect)
- } ~
- path ("accesstoken") {
- post {
- // Guest account don't have permission to submit new application in UI
- formField(FieldMagnet('accesstoken.as[String])) {accesstoken: String =>
- loginWithOAuth2Parameters(Map("accesstoken" -> accesstoken))
- }
- }
- } ~
- path("callback") {
- // Login with authorization code or access token.
- parameterMap {parameters =>
- loginWithOAuth2Parameters(parameters)
- }
- }
- }
- }
- } ~
- path("logout") {
- post {
- requireAuthentication {session =>
- logout(session, ip.toString())
- }
- }
- } ~
- requireAuthentication {user =>
- requireAuthorization(user, inner.route)
- }
- }}}
-
- }
-}
-
-object SecurityService {
-
- val SESSION_MANAGER_KEY = "akka.http.session.server-secret"
-
- case class UserSession(user: String, permissionLevel: Int)
-
- object UserSession {
-
- private val User = "user"
- private val PermissionLevel = "permissionLevel"
-
- implicit def serializer: MultiValueSessionSerializer[UserSession] = {
- new MultiValueSessionSerializer[UserSession](
- toMap = {t: UserSession =>
- Map(User -> t.user, PermissionLevel -> t.permissionLevel.toString)
- },
- fromMap = {m: Map[String, String] =>
- if (m.contains(User)) {
- Try(UserSession(m(User), m(PermissionLevel).toInt))
- } else {
- Failure[UserSession](new Exception("Fail to parse session "))
- }
- }
- )
- }
- }
-
- case class User(user: String)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
deleted file mode 100644
index fc990a1..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model._
-import akka.http.scaladsl.server.Directives._
-import akka.stream.Materializer
-
-import io.gearpump.util.Util
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-/**
- * static resource files.
- */
-class StaticService(override val system: ActorSystem, supervisorPath: String)
- extends BasicService {
-
- private val version = Util.version
-
- protected override def prefix = Neutral
-
- override def cache: Boolean = true
-
- protected override def doRoute(implicit mat: Materializer) = {
- path("version") {
- get { ctx =>
- ctx.complete(version)
- }
- } ~
- // For YARN usage, we need to make sure supervisor-path
- // can be accessed without authentication.
- path("supervisor-actor-path") {
- get {
- complete(supervisorPath)
- }
- } ~
- pathEndOrSingleSlash {
- getFromResource("index.html")
- } ~
- path("favicon.ico") {
- complete(StatusCodes.NotFound)
- } ~
- pathPrefix("webjars") {
- get {
- getFromResourceDirectory("META-INF/resources/webjars")
- }
- } ~
- path(Rest) { path =>
- getFromResource("%s" format path)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
deleted file mode 100644
index c7f19e4..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server.Route
-import akka.stream.Materializer
-
-import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import io.gearpump.cluster.ClientToMaster._
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.services.SupervisorService.{Path, Status}
-import io.gearpump.util.ActorUtil._
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-/** Responsible for adding/removing machines. Typically it delegates to YARN. */
-class SupervisorService(
- val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem)
- extends BasicService {
-
- import upickle.default.write
-
- /**
- * TODO: Add additional check to ensure the user have enough authorization to
- * add/remove a worker machine
- */
- private def authorize(internal: Route): Route = {
- if (supervisor == null) {
- failWith(new Exception("API not enabled, cannot find a valid supervisor! " +
- "Please make sure Gearpump is running on top of YARN or other resource managers"))
- } else {
- internal
- }
- }
-
- protected override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") {
- pathEnd {
- get {
- val path = if (supervisor == null) {
- null
- } else {
- supervisor.path.toString
- }
- complete(write(Path(path)))
- }
- } ~
- path("status") {
- post {
- if (supervisor == null) {
- complete(write(Status(enabled = false)))
- } else {
- complete(write(Status(enabled = true)))
- }
- }
- } ~
- path("addworker" / IntNumber) { workerCount =>
- post {
- authorize {
- onComplete(askActor[CommandResult](supervisor, AddWorker(workerCount))) {
- case Success(value) =>
- complete(write(value))
- case Failure(ex) =>
- failWith(ex)
- }
- }
- }
- } ~
- path("removeworker" / Segment) { workerIdString =>
- post {
- authorize {
- val workerId = WorkerId.parse(workerIdString)
- def future(): Future[CommandResult] = {
- askWorker[WorkerData](master, workerId, GetWorkerData(workerId)).flatMap{workerData =>
- val containerId = workerData.workerDescription.resourceManagerContainerId
- askActor[CommandResult](supervisor, RemoveWorker(containerId))
- }
- }
-
- onComplete[CommandResult](future()) {
- case Success(value) =>
- complete(write(value))
- case Failure(ex) =>
- failWith(ex)
- }
- }
- }
- }
- }
-}
-
-object SupervisorService {
- case class Status(enabled: Boolean)
-
- case class Path(path: String)
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
deleted file mode 100644
index 78cdb37..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.util.{Failure, Success}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.http.scaladsl.server.Directives._
-import akka.stream.Materializer
-
-import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ReadOption}
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.ActorUtil._
-import io.gearpump.util.Constants
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-/** Service to handle worker related queries */
-class WorkerService(val master: ActorRef, override val system: ActorSystem)
- extends BasicService {
-
- import upickle.default.write
- private val systemConfig = system.settings.config
- private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
-
- protected override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) {
- workerIdString => {
- pathEnd {
- val workerId = WorkerId.parse(workerIdString)
- onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) {
- case Success(value: WorkerData) =>
- complete(write(value.workerDescription))
- case Failure(ex) => failWith(ex)
- }
- }
- }~
- path("config") {
- val workerId = WorkerId.parse(workerIdString)
- onComplete(askWorker[WorkerConfig](master, workerId, QueryWorkerConfig(workerId))) {
- case Success(value: WorkerConfig) =>
- val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
- complete(config)
- case Failure(ex) =>
- failWith(ex)
- }
- } ~
- path("metrics" / RestPath ) { path =>
- val workerId = WorkerId.parse(workerIdString)
- parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
- val query = QueryHistoryMetrics(path.head.toString, readOption)
- onComplete(askWorker[HistoryMetrics](master, workerId, query)) {
- case Success(value) =>
- complete(write(value))
- case Failure(ex) =>
- failWith(ex)
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala b/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala
deleted file mode 100644
index a647a8d..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.main
-
-import java.util.Random
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.server.Route
-import akka.stream.ActorMaterializer
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-import sun.misc.BASE64Encoder
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, Gear}
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.services.{RestServices, SecurityService}
-import io.gearpump.util.LogUtil.ProcessType
-import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
-
-/** Command line to start UI server */
-object Services extends AkkaApp with ArgumentsParser {
-
- private val LOG = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] = Array(
- "master" -> CLIOption("<host:port>", required = false),
- Gear.OPTION_CONFIG -> CLIOption("<provide a custom configuration file>", required = false),
- "supervisor" -> CLIOption("<Supervisor Actor Path>", required = false, Some("")))
-
- override val description = "UI Server"
-
- override def akkaConfig: Config = {
- ClusterConfig.ui()
- }
-
- override def help(): Unit = {
- // scalastyle:off println
- Console.println("UI Server")
- // scalastyle:on println
- }
-
- private var killFunction: Option[() => Unit] = None
-
- override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
-
- val argConfig = parse(args)
- var akkaConf =
- if (argConfig.exists(Gear.OPTION_CONFIG)) {
- ClusterConfig.ui(argConfig.getString(Gear.OPTION_CONFIG))
- } else {
- inputAkkaConf
- }
-
- val LOG: Logger = {
- LogUtil.loadConfiguration(akkaConf, ProcessType.UI)
- LogUtil.getLogger(getClass)
- }
-
- if (argConfig.exists("master")) {
- val master = argConfig.getString("master")
- akkaConf = akkaConf.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
- ConfigValueFactory.fromIterable(List(master).asJava))
- }
-
- akkaConf = akkaConf.withValue(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH,
- ConfigValueFactory.fromAnyRef(argConfig.getString("supervisor")))
- // Creates a random unique secret key for session manager.
- // All previous stored session token cookies will be invalidated when UI
- // server is restarted.
- .withValue(SecurityService.SESSION_MANAGER_KEY,
- ConfigValueFactory.fromAnyRef(randomSeverSecret()))
-
- val masterCluster = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
- .flatMap(Util.parseHostList)
-
- implicit val system = ActorSystem("services", akkaConf)
- implicit val executionContext = system.dispatcher
-
- import scala.concurrent.duration._
- val master = system.actorOf(MasterProxy.props(masterCluster, 1.day),
- s"masterproxy${system.name}")
- val (host, port) = parseHostPort(system.settings.config)
-
- implicit val mat = ActorMaterializer()
- val services = new RestServices(master, mat, system)
-
- val bindFuture = Http().bindAndHandle(Route.handlerFlow(services.route), host, port)
- Await.result(bindFuture, 15.seconds)
-
- val displayHost = if (host == "0.0.0.0") "127.0.0.1" else host
- LOG.info(s"Please browse to http://$displayHost:$port to see the web UI")
-
- // scalastyle:off println
- println(s"Please browse to http://$displayHost:$port to see the web UI")
- // scalastyle:on println
-
- killFunction = Some { () =>
- LOG.info("Shutting down UI Server")
- system.terminate()
- }
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-
- private def randomSeverSecret(): String = {
- val random = new Random()
- val length = 64 // Required
- val bytes = new Array[Byte](length)
- random.nextBytes(bytes)
- val endecoder = new BASE64Encoder()
- endecoder.encode(bytes)
- }
-
- private def parseHostPort(config: Config): (String, Int) = {
- val port = config.getInt(Constants.GEARPUMP_SERVICE_HTTP)
- val host = config.getString(Constants.GEARPUMP_SERVICE_HOST)
- (host, port)
- }
-
- // TODO: fix this
- // Hacks around for YARN module, so that we can kill the UI server
- // when application is shutting down.
- def kill(): Unit = {
- if (killFunction.isDefined) {
- killFunction.get.apply()
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/package.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/package.scala b/services/jvm/src/main/scala/io/gearpump/services/package.scala
deleted file mode 100644
index 7b38134..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/package.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump
-
-package object services {
- final val REST_VERSION = "v1.0"
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
deleted file mode 100644
index 158d406..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import com.typesafe.config.Config
-
-import io.gearpump.services.SecurityService.UserSession
-import io.gearpump.util.Constants
-import io.gearpump.util.Constants._
-
-/**
- *
- * Uses OAuth2 social-login as the mechanism for authentication.
- * @see [[https://tools.ietf.org/html/rfc6749]] to find what is OAuth2, and how it works.
- *
- * Basically flow for OAuth2 Authentication:
- * 1. User accesses Gearpump UI website, and choose to login with OAuth2 server.
- * 2. Gearpump UI website redirects user to OAuth2 server domain authorization endpoint.
- * 3. End user complete the authorization in the domain of OAuth2 server.
- * 4. OAuth2 server redirects user back to Gearpump UI server.
- * 5. Gearpump UI server verify the tokens and extract credentials from query
- * parameters and form fields.
- *
- * NOTE: '''Thread-safety''' is a MUST requirement. Developer need to ensure the sub-class is
- * thread-safe. Sub-class should have a parameterless constructor.
- *
- * NOTE: OAuth2 Authenticator requires access of Internet. Please make sure HTTP proxy are
- * set properly if applied.
- *
- * Example: Config proxy when UI server is started on Windows:
- * {{{
- * > set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com
- * -Dhttps.proxyPort=8088
- * > bin\services
- * }}}
- *
- * Example: Config proxy when UI server is started on Linux:
- * {{{
- * $ export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com
- * -Dhttps.proxyPort=8088"
- * $ bin/services
- * }}}
- */
-trait OAuth2Authenticator {
-
- /**
- * Inits authenticator with config which contains client ID, client secret, and etc..
- *
- * Typically, the client key and client secret is provided by OAuth2 Authorization server
- * when user register an application there.
- * See [[https://tools.ietf.org/html/rfc6749]] for definition of client, client Id,
- * and client secret.
- *
- * See [[https://developer.github.com/v3/oauth/]] for an actual example of how Github
- * use client key, and client secret.
- *
- * NOTE: '''Thread-Safety''': Framework ensures this call is synchronized.
- *
- * @param config Client Id, client secret, callback URL and etc..
- * @param executionContext ExecutionContext from hosting environment.
- */
- def init(config: Config, executionContext: ExecutionContext): Unit
-
- /**
- * Returns the OAuth Authorization URL so for redirection to that address to do OAuth2
- * authorization.
- *
- * NOTE: '''Thread-Safety''': This can be called in a multi-thread environment. Developer
- * need to ensure thread safety.
- */
- def getAuthorizationUrl: String
-
- /**
- * After authorization, OAuth2 server redirects user back with tokens. This verify the
- * tokens, retrieve the profiles, and return [[UserSession]] information.
- *
- * NOTE: This is an Async call.
- *
- * NOTE: This call requires external internet access.
- *
- * NOTE: '''Thread-Safety''': This can be called in a multi-thread environment. Developer
- * need to ensure thread safety.
- *
- * @param parameters HTTP Query and Post parameters, which typically contains Authorization code.
- * @return UserSession if authentication pass.
- */
- def authenticate(parameters: Map[String, String]): Future[UserSession]
-
- /**
- * Clean up resource
- */
- def close(): Unit
-}
-
-object OAuth2Authenticator {
-
- // Serves as a quick immutable lookup cache
- private var providers = Map.empty[String, OAuth2Authenticator]
-
- /**
- * Load Authenticator from [[Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS]]
- *
- * @param provider, Name for the OAuth2 Authentication Service.
- * @return Returns null if the OAuth2 Authentication is disabled.
- */
- def get(config: Config, provider: String, executionContext: ExecutionContext)
- : OAuth2Authenticator = {
-
- if (providers.contains(provider)) {
- providers(provider)
- } else {
- val path = s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$provider"
- val enabled = config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)
- if (enabled && config.hasPath(path)) {
- this.synchronized {
- if (providers.contains(provider)) {
- providers(provider)
- } else {
- val authenticatorConfig = config.getConfig(path)
- val authenticatorClass = authenticatorConfig.getString(
- GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
- val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass)
- val authenticator = clazz.newInstance().asInstanceOf[OAuth2Authenticator]
- authenticator.init(authenticatorConfig, executionContext)
- providers += provider -> authenticator
- authenticator
- }
- }
- } else {
- null
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
deleted file mode 100644
index e422d3f..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2.impl
-
-import java.util.concurrent.atomic.AtomicBoolean
-import scala.collection.mutable.StringBuilder
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success}
-
-import com.typesafe.config.Config
-import com.github.scribejava.core.builder.ServiceBuilderAsync
-import com.github.scribejava.core.builder.api.DefaultApi20
-import com.github.scribejava.core.model._
-import com.github.scribejava.core.oauth.OAuth20Service
-import com.github.scribejava.core.utils.OAuthEncoder
-import com.ning.http.client.AsyncHttpClientConfig
-
-import io.gearpump.security.Authenticator
-import io.gearpump.services.SecurityService.UserSession
-import io.gearpump.services.security.oauth2.OAuth2Authenticator
-import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20
-import io.gearpump.util.Constants._
-import io.gearpump.util.Util
-
-/**
- * Uses Ning AsyncClient to connect to OAuth2 service.
- *
- * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more API information.
- */
-abstract class BaseOAuth2Authenticator extends OAuth2Authenticator {
-
- // Authorize Url for end user to authorize
- protected def authorizeUrl: String
-
- // Used to fetch the Access Token.
- protected def accessTokenEndpoint: String
-
- // Protected resource Url to get the user profile
- protected def protectedResourceUrl: String
-
- // Extracts the username information from response of protectedResourceUrl
- protected def extractUserName(body: String): String
-
- // Scope required to access protectedResourceUrl
- protected def scope: String
-
- // OAuth2 endpoint definition for ScribeJava.
- protected def oauth2Api(): DefaultApi20 = {
- new BaseApi20(authorizeUrl, accessTokenEndpoint)
- }
-
- protected var oauthService: OAuth20Service = null
-
- protected var executionContext: ExecutionContext = null
-
- private var defaultPermissionLevel = Authenticator.Guest.permissionLevel
-
- // Synchronization ensured by the caller
- override def init(config: Config, executionContext: ExecutionContext): Unit = {
- if (this.oauthService == null) {
- val callback = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK)
- val clientId = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID)
- val clientSecret = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET)
- defaultPermissionLevel = {
- val role = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE)
- role match {
- case "guest" => Authenticator.Guest.permissionLevel
- case "user" => Authenticator.User.permissionLevel
- case "admin" => Authenticator.Admin.permissionLevel
- case _ => Authenticator.UnAuthenticated.permissionLevel
- }
- }
- this.oauthService = buildOAuth2Service(clientId, clientSecret, callback)
- this.executionContext = executionContext
- }
- }
-
- private val isClosed: AtomicBoolean = new AtomicBoolean(false)
-
- override def close(): Unit = {
- if (isClosed.compareAndSet(false, true)) {
- if (null != oauthService && null != oauthService.getAsyncHttpClient()) {
- oauthService.getAsyncHttpClient().close()
- }
- }
- }
-
- override def getAuthorizationUrl(): String = {
- oauthService.getAuthorizationUrl()
- }
-
- protected def authenticateWithAccessToken(accessToken: OAuth2AccessToken): Future[UserSession] = {
- val promise = Promise[UserSession]()
- val request = new OAuthRequestAsync(Verb.GET, protectedResourceUrl, oauthService)
- oauthService.signRequest(accessToken, request)
- request.sendAsync {
- new OAuthAsyncRequestCallback[Response] {
- override def onCompleted(response: Response): Unit = {
- try {
- val user = extractUserName(response.getBody)
- promise.success(new UserSession(user, defaultPermissionLevel))
- } catch {
- case ex: Throwable =>
- promise.failure(ex)
- }
- }
-
- override def onThrowable(throwable: Throwable): Unit = {
- promise.failure(throwable)
- }
- }
- }
- promise.future
- }
-
- protected def authenticateWithAuthorizationCode(code: String): Future[UserSession] = {
-
- implicit val ec: ExecutionContext = executionContext
-
- val promise = Promise[UserSession]()
- oauthService.getAccessTokenAsync(code,
-
- new OAuthAsyncRequestCallback[OAuth2AccessToken] {
- override def onCompleted(accessToken: OAuth2AccessToken): Unit = {
- authenticateWithAccessToken(accessToken).onComplete {
- case Success(user) => promise.success(user)
- case Failure(ex) => promise.failure(ex)
- }
- }
-
- override def onThrowable(throwable: Throwable): Unit = {
- promise.failure(throwable)
- }
- })
- promise.future
- }
-
- override def authenticate(parameters: Map[String, String]): Future[UserSession] = {
-
- val code = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE)
- val accessToken = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN)
-
- if (accessToken.isDefined) {
- authenticateWithAccessToken(new OAuth2AccessToken(accessToken.get))
- } else if (code.isDefined) {
- authenticateWithAuthorizationCode(code.get)
- } else {
- // Fails authentication if code not exist
- Future.failed(new Exception("Fail to authenticate user as there is no code parameter in URL"))
- }
- }
-
- private def buildOAuth2Service(clientId: String, clientSecret: String, callback: String)
- : OAuth20Service = {
- val state: String = "state" + Util.randInt()
- ScribeJavaConfig.setForceTypeOfHttpRequests(
- ForceTypeOfHttpRequest.FORCE_ASYNC_ONLY_HTTP_REQUESTS)
- val clientConfig: AsyncHttpClientConfig = new AsyncHttpClientConfig.Builder()
- .setMaxConnections(5)
- .setUseProxyProperties(true)
- .setRequestTimeout(60000)
- .setAllowPoolingConnections(false)
- .setPooledConnectionIdleTimeout(60000)
- .setReadTimeout(60000).build
-
- val service: OAuth20Service = new ServiceBuilderAsync()
- .apiKey(clientId)
- .apiSecret(clientSecret)
- .scope(scope)
- .state(state)
- .callback(callback)
- .asyncHttpClientConfig(clientConfig)
- .build(oauth2Api())
-
- service
- }
-}
-
-object BaseOAuth2Authenticator {
-
- class BaseApi20(authorizeUrl: String, accessTokenEndpoint: String) extends DefaultApi20 {
- def getAccessTokenEndpoint: String = {
- accessTokenEndpoint
- }
-
- def getAuthorizationUrl(config: OAuthConfig): String = {
- val sb: StringBuilder = new StringBuilder(String.format(authorizeUrl,
- config.getResponseType, config.getApiKey, OAuthEncoder.encode(config.getCallback),
- OAuthEncoder.encode(config.getScope)))
- val state: String = config.getState
- if (state != null) {
- sb.append('&').append(OAuthConstants.STATE).append('=').append(OAuthEncoder.encode(state))
- }
- sb.toString
- }
-
- override def createService(config: OAuthConfig): OAuth20Service = {
- new OAuth20Service(this, config) {
-
- protected override def createAccessTokenRequest[T <: AbstractRequest](
- code: String, request: T): T = {
- super.createAccessTokenRequest(code, request)
-
- if (!getConfig.hasGrantType) {
- request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE)
- }
-
- // Work-around for issue https://github.com/scribejava/scribejava/issues/641
- request.addHeader("Content-Type", "application/x-www-form-urlencoded")
- request
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
deleted file mode 100644
index 98e37ea..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2.impl
-
-import scala.concurrent.{ExecutionContext, Future, Promise}
-
-import com.typesafe.config.Config
-import com.github.scribejava.core.builder.api.DefaultApi20
-import com.github.scribejava.core.model._
-import com.github.scribejava.core.oauth.OAuth20Service
-import com.ning.http.client
-import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient}
-import spray.json.{JsString, _}
-import sun.misc.BASE64Encoder
-
-import io.gearpump.services.SecurityService.UserSession
-import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20
-import io.gearpump.util.Constants._
-
-/**
- *
- * Does authentication with CloudFoundry UAA service. Currently it only
- * extract the email address of end user.
- *
- * For what is UAA,
- * See [[https://github.com/cloudfoundry/uaa for information about CloudFoundry UAA]]
- * (User Account and Authentication Service)
- *
- * Pre-requisite steps to use this Authenticator:
- *
- * Step1: Register your website to UAA with tool uaac.
- * 1) Check tutorial on uaac at
- * [[https://docs.cloudfoundry.org/adminguide/uaa-user-management.html]]
- *
- * 2) Open a bash shell, set the UAA server by command `uaac target`
- * {{{
- * uaac target [your uaa server url]
- * }}}
- *
- * NOTE: [your uaa server url] should match the uaahost settings in gear.conf
- *
- * 3) Login in as user admin by
- * {{{
- * uaac token client get admin -s MyAdminPassword
- * }}}
- *
- * 4) Create a new Application (Client) in UAA,
- * {{{
- * uaac client add [your_client_id]
- * --scope "openid cloud_controller.read"
- * --authorized_grant_types "authorization_code client_credentials refresh_token"
- * --authorities "openid cloud_controller.read"
- * --redirect_uri [your_redirect_url]
- * --autoapprove true
- * --secret [your_client_secret]
- * }}}
- *
- * Step2: Configure the OAuth2 information in gear.conf
- *
- * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled"
- * as true.
- *
- * 2) Navigate to section "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa"
- *
- * 3) Config gear.conf "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" section.
- * Please make sure class name, client ID, client Secret, and callback URL are set properly.
- *
- * NOTE: The callback URL here should match what you set on CloudFoundry UAA in step1.
- *
- * Step3: Restart the UI service and try the "social login" button for UAA.
- *
- * NOTE: OAuth requires Internet access, @see
- * [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] to find tutorials to
- * configure Internet proxy.
- *
- * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more background
- * information of OAuth2.
- */
-class CloudFoundryUAAOAuth2Authenticator extends BaseOAuth2Authenticator {
-
- import io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator._
-
- private var host: String = null
-
- protected override def authorizeUrl: String =
- s"$host/oauth/authorize?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s"
-
- protected override def accessTokenEndpoint: String = s"$host/oauth/token"
-
- protected override def protectedResourceUrl: String = s"$host/userinfo"
-
- protected override def scope: String = "openid,cloud_controller.read"
-
- private var additionalAuthenticator: Option[AdditionalAuthenticator] = None
-
- override def init(config: Config, executionContext: ExecutionContext): Unit = {
- host = config.getString("uaahost")
- super.init(config, executionContext)
-
- if (config.getBoolean(ADDITIONAL_AUTHENTICATOR_ENABLED)) {
- val additionalAuthenticatorConfig = config.getConfig(ADDITIONAL_AUTHENTICATOR)
- val authenticatorClass = additionalAuthenticatorConfig
- .getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
- val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass)
- val authenticator = clazz.newInstance().asInstanceOf[AdditionalAuthenticator]
- authenticator.init(additionalAuthenticatorConfig, executionContext)
- additionalAuthenticator = Option(authenticator)
- }
- }
-
- protected override def extractUserName(body: String): String = {
- val email = body.parseJson.asJsObject.fields("email").asInstanceOf[JsString]
- email.value
- }
-
- protected override def oauth2Api(): DefaultApi20 = {
- new CloudFoundryUAAService(authorizeUrl, accessTokenEndpoint)
- }
-
- protected override def authenticateWithAccessToken(accessToken: OAuth2AccessToken)
- : Future[UserSession] = {
-
- implicit val ec: ExecutionContext = executionContext
-
- if (additionalAuthenticator.isDefined) {
- super.authenticateWithAccessToken(accessToken).flatMap { user =>
- additionalAuthenticator.get.authenticate(oauthService.getAsyncHttpClient, accessToken, user)
- }
- } else {
- super.authenticateWithAccessToken(accessToken)
- }
- }
-}
-
-object CloudFoundryUAAOAuth2Authenticator {
- private val RESPONSE_TYPE = "response_type"
-
- val ADDITIONAL_AUTHENTICATOR_ENABLED = "additional-authenticator-enabled"
- val ADDITIONAL_AUTHENTICATOR = "additional-authenticator"
-
- private class CloudFoundryUAAService(authorizeUrl: String, accessTokenEndpoint: String)
- extends BaseApi20(authorizeUrl, accessTokenEndpoint) {
-
- private def base64(in: String): String = {
- val encoder = new BASE64Encoder()
- val utf8 = "UTF-8"
- encoder.encode(in.getBytes(utf8))
- }
-
- override def createService(config: OAuthConfig): OAuth20Service = {
- new OAuth20Service(this, config) {
-
- protected override def createAccessTokenRequest[T <: AbstractRequest](
- code: String, request: T): T = {
- val config: OAuthConfig = getConfig()
-
- request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE)
- request.addParameter(OAuthConstants.CODE, code)
- request.addParameter(RESPONSE_TYPE, "token")
- request.addParameter(OAuthConstants.REDIRECT_URI, config.getCallback)
-
- // Work around issue https://github.com/scribejava/scribejava/issues/641
- request.addHeader("Content-Type", "application/x-www-form-urlencoded")
-
- // CloudFoundry requires a Authorization header encoded with client Id and secret.
- val authorizationHeader = "Basic " + base64(config.getApiKey + ":" + config.getApiSecret)
- request.addHeader("Authorization", authorizationHeader)
- request
- }
- }
- }
- }
-
- /**
- * Additional authenticator to check more credential attributes of user before logging in.
- * This authenticator is applied AFTER user pass the initial (default) authenticator.
- */
- trait AdditionalAuthenticator {
-
- /**
- * Initialization
- *
- * @param config Configurations specifically used for this authenticator.
- * @param executionContext Execution Context to use to run futures.
- */
- def init(config: Config, executionContext: ExecutionContext): Unit
-
- /**
- *
- * @param accessToken, the accessToken for the UAA
- * @param user user session returned by previous authenticator
- * @return an updated UserSession
- */
- def authenticate(
- asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, user: UserSession)
- : Future[UserSession]
- }
-
- val ORGANIZATION_URL = "organization-url"
-
- class OrganizationAccessChecker extends AdditionalAuthenticator {
- private var organizationUrl: String = null
- private implicit var executionContext: ExecutionContext = null
-
- override def init(config: Config, executionContext: ExecutionContext): Unit = {
- this.organizationUrl = config.getString(ORGANIZATION_URL)
- this.executionContext = executionContext
- }
-
- override def authenticate(asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken,
- user: UserSession): Future[UserSession] = {
-
- val promise = Promise[UserSession]()
- val builder = asyncClient.prepareGet(organizationUrl)
- builder.addHeader("Authorization", s"bearer ${accessToken.getAccessToken}")
- builder.execute(new AsyncCompletionHandler[Unit] {
- override def onCompleted(response: client.Response): Unit = {
- if (response.getStatusCode == 200) {
- promise.success(user)
- } else {
- promise.failure(new Exception(response.getResponseBody))
- }
- }
- })
- promise.future
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
deleted file mode 100644
index c2b18fd..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2.impl
-
-import com.github.scribejava.apis.google.GoogleJsonTokenExtractor
-import com.github.scribejava.core.builder.api.DefaultApi20
-import com.github.scribejava.core.extractors.TokenExtractor
-import com.github.scribejava.core.model._
-import spray.json._
-
-/**
- *
- * Does authentication with Google OAuth2 service. It only extract the email address
- * from user profile of Google.
- *
- * Pre-requisite steps to use this Authenticator:
- *
- * Step1: Register your website as an OAuth2 Application on Google
- * 1) Create an application representing your website at [[https://console.developers.google.com]]
- *
- * 2) In "API Manager" of your created application, enable API "Google+ API"
- *
- * 3) Create OAuth client ID for this application. In "Credentials" tab of "API Manager",
- * choose "Create credentials", and then select OAuth client ID. Follow the wizard
- * to set callback URL, and generate client ID, and client Secret. Callback URL is NOT optional.
- *
- * Step2: Configure the OAuth2 information in gear.conf
- *
- * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled"
- * as true.
- *
- * 2) Configure section "gearpump.ui-security.oauth2-authenticators.google". Please make sure
- * class name, client ID, client Secret, and callback URL are set properly.
- *
- * NOTE: callback URL set here should match what is configured on Google in step1.
- *
- * Step3: Restart the UI service and try out the Google social login button in UI.
- *
- * NOTE: OAuth requires Internet access, @see
- * [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] to find some helpful tutorials
- *
- * NOTE: Google use scope to define what data can be fetched by OAuth2. Currently we use profile
- * [[https://www.googleapis.com/auth/userinfo.email]]. However, Google may change the profile
- * in future.
- *
- * TODO Currently, this doesn't verify the state from Google OAuth2 response.
- *
- * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more API information.
- */
-class GoogleOAuth2Authenticator extends BaseOAuth2Authenticator {
-
- import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator._
-
- protected override def authorizeUrl: String = AuthorizeUrl
-
- protected override def accessTokenEndpoint: String = AccessEndpoint
-
- protected override def protectedResourceUrl: String = ResourceUrl
-
- protected override def scope: String = GoogleOAuth2Authenticator.Scope
-
- protected override def extractUserName(body: String): String = {
- val emails = body.parseJson.asJsObject.fields("emails").asInstanceOf[JsArray]
- val email = emails.elements(0).asJsObject("Cannot find email account")
- .fields("value").asInstanceOf[JsString].value
- email
- }
-
- override def oauth2Api(): DefaultApi20 = new AsyncGoogleApi20(authorizeUrl, accessTokenEndpoint)
-}
-
-object GoogleOAuth2Authenticator {
-
- import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator._
-
- // scalastyle:off line.size.limit
- val AuthorizeUrl = "https://accounts.google.com/o/oauth2/auth?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s"
- // scalastyle:on line.size.limit
- val AccessEndpoint = "https://www.googleapis.com/oauth2/v4/token"
- val ResourceUrl = "https://www.googleapis.com/plus/v1/people/me"
- val Scope = "https://www.googleapis.com/auth/userinfo.email"
-
- private class AsyncGoogleApi20(authorizeUrl: String, accessEndpoint: String)
- extends BaseApi20(authorizeUrl, accessEndpoint) {
-
- override def getAccessTokenExtractor: TokenExtractor[OAuth2AccessToken] = {
- GoogleJsonTokenExtractor.instance
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
deleted file mode 100644
index f95474e..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.util
-
-import upickle.Js
-
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.Graph
-
-object UpickleUtil {
-
- // For implicit type, we need to add EXPLICIT return type, otherwise, upickle may NOT infer the
- // reader type automatically.
- // See issue https://github.com/lihaoyi/upickle-pprint/issues/102
- implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = {
- upickle.default.Reader[Graph[Int, String]] {
- case Js.Obj(verties, edges) =>
- val vertexList = upickle.default.readJs[List[Int]](verties._2)
- val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
- Graph(vertexList, edgeList)
- }
- }
-
- implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] {
- case Js.Str(str) =>
- WorkerId.parse(str)
- }
-
- implicit val workerIdWriter: upickle.default.Writer[WorkerId] = upickle.default.Writer[WorkerId] {
- case workerId: WorkerId =>
- Js.Str(WorkerId.render(workerId))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala
new file mode 100644
index 0000000..4d1ba22
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.stream.Materializer
+
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/**
+ * AdminService is for cluster-wide managements. it is not related with
+ * specific application.
+ *
+ * For example:
+ * - Security management: Add user, remove user.
+ * - Configuration management: Change configurations.
+ * - Machine management: Add worker machines, remove worker machines, and add masters.
+ */
+
+// TODO: Add YARN resource manager capacities to add/remove machines.
+class AdminService(override val system: ActorSystem)
+ extends BasicService {
+
+ protected override def prefix = Neutral
+
+ protected override def doRoute(implicit mat: Materializer) = {
+ path("terminate") {
+ post {
+ system.terminate()
+ complete(StatusCodes.NotFound)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
new file mode 100644
index 0000000..1ca2306
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.http.scaladsl.model.{FormData, Multipart}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
+import akka.stream.Materializer
+import upickle.default.{read, write}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppMasterSummary, GeneralAppMasterSummary}
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.services.AppMasterService.Status
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
+import org.apache.gearpump.streaming.appmaster.DagManager._
+import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
+import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
+import org.apache.gearpump.util.ActorUtil.{askActor, askAppMaster}
+import org.apache.gearpump.util.FileDirective._
+import org.apache.gearpump.util.{Constants, Util}
+
+/**
+ * Management service for AppMaster
+ */
+class AppMasterService(val master: ActorRef,
+ val jarStore: JarStoreService, override val system: ActorSystem)
+ extends BasicService {
+
+ private val systemConfig = system.settings.config
+ private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
+
+ protected override def doRoute(implicit mat: Materializer) = pathPrefix("appmaster" / IntNumber) {
+ appId => {
+ path("dynamicdag") {
+ parameters(ParamMagnet("args")) { args: String =>
+ def replaceProcessor(dagOperation: DAGOperation): Route = {
+ onComplete(askAppMaster[DAGOperationResult](master, appId, dagOperation)) {
+ case Success(value) =>
+ complete(write(value))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+
+ val msg = java.net.URLDecoder.decode(args, "UTF-8")
+ val dagOperation = read[DAGOperation](msg)
+ (post & entity(as[Multipart.FormData])) { _ =>
+ uploadFile { form =>
+ val jar = form.getFile("jar").map(_.file)
+
+ if (jar.nonEmpty) {
+ dagOperation match {
+ case replace: ReplaceProcessor =>
+ val description = replace.newProcessorDescription.copy(jar =
+ Util.uploadJar(jar.get, jarStore))
+ val dagOperationWithJar = replace.copy(newProcessorDescription = description)
+ replaceProcessor(dagOperationWithJar)
+ }
+ } else {
+ replaceProcessor(dagOperation)
+ }
+ }
+ } ~ (post & entity(as[FormData])) { _ =>
+ replaceProcessor(dagOperation)
+ }
+ }
+ } ~
+ path("stallingtasks") {
+ onComplete(askAppMaster[StallingTasks](master, appId, GetStallingTasks(appId))) {
+ case Success(value) =>
+ complete(write(value))
+ case Failure(ex) => failWith(ex)
+ }
+ } ~
+ path("errors") {
+ onComplete(askAppMaster[LastFailure](master, appId, GetLastFailure(appId))) {
+ case Success(value) =>
+ complete(write(value))
+ case Failure(ex) => failWith(ex)
+ }
+ } ~
+ path("restart") {
+ post {
+ onComplete(askActor[SubmitApplicationResult](master, RestartApplication(appId))) {
+ case Success(_) =>
+ complete(write(Status(true)))
+ case Failure(ex) =>
+ complete(write(Status(false, ex.getMessage)))
+ }
+ }
+ } ~
+ path("config") {
+ onComplete(askActor[AppMasterConfig](master, QueryAppMasterConfig(appId))) {
+ case Success(value: AppMasterConfig) =>
+ val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
+ complete(config)
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ } ~
+ pathPrefix("executor" / Segment) { executorIdString =>
+ path("config") {
+ val executorId = Integer.parseInt(executorIdString)
+ onComplete(askAppMaster[ExecutorConfig](master, appId, QueryExecutorConfig(executorId))) {
+ case Success(value) =>
+ val config = Option(value.config).map(ClusterConfig.render(_, concise))
+ .getOrElse("{}")
+ complete(config)
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ } ~
+ pathEnd {
+ get {
+ val executorId = Integer.parseInt(executorIdString)
+ onComplete(askAppMaster[ExecutorSummary](master, appId,
+ GetExecutorSummary(executorId))) {
+ case Success(value) =>
+ complete(write(value))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+ }
+ } ~
+ path("metrics" / RestPath) { path =>
+ parameterMap { optionMap =>
+ parameter("aggregator" ? "") { aggregator =>
+ parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
+ val query = QueryHistoryMetrics(path.head.toString, readOption, aggregator, optionMap)
+ onComplete(askAppMaster[HistoryMetrics](master, appId, query)) {
+ case Success(value) =>
+ complete(write(value))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+ }
+ }
+ } ~
+ pathEnd {
+ get {
+ parameter("detail" ? "false") { detail =>
+ val queryDetails = Try(detail.toBoolean).getOrElse(false)
+ val request = AppMasterDataDetailRequest(appId)
+ queryDetails match {
+ case true =>
+ onComplete(askAppMaster[AppMasterSummary](master, appId, request)) {
+ case Success(value) =>
+ value match {
+ case data: GeneralAppMasterSummary =>
+ complete(write(data))
+ case data: StreamAppMasterSummary =>
+ complete(write(data))
+ }
+ case Failure(ex) =>
+ failWith(ex)
+ }
+
+ case false =>
+ onComplete(askActor[AppMasterData](master, AppMasterDataRequest(appId))) {
+ case Success(value) =>
+ complete(write(value))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+ }
+ }
+ } ~
+ pathEnd {
+ delete {
+ val writer = (result: ShutdownApplicationResult) => {
+ val output = if (result.appId.isSuccess) {
+ Map("status" -> "success", "info" -> null)
+ } else {
+ Map("status" -> "fail", "info" -> result.appId.failed.get.toString)
+ }
+ write(output)
+ }
+ onComplete(askActor[ShutdownApplicationResult](master, ShutdownApplication(appId))) {
+ case Success(result) =>
+ val output = if (result.appId.isSuccess) {
+ Map("status" -> "success", "info" -> null)
+ } else {
+ Map("status" -> "fail", "info" -> result.appId.failed.get.toString)
+ }
+ complete(write(output))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+ }
+ }
+ }
+}
+
+object AppMasterService {
+ case class Status(success: Boolean, reason: String = null)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala
new file mode 100644
index 0000000..3846c33
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.ExecutionContext
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.headers.CacheDirectives.{`max-age`, `no-cache`}
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import akka.stream.Materializer
+
+import org.apache.gearpump.util.{Constants, LogUtil}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+trait RouteService {
+ def route: Route
+}
+
+/**
+ * Wraps the cache behavior, and some common utils.
+ */
+trait BasicService extends RouteService {
+
+ implicit def system: ActorSystem
+
+ implicit def timeout: akka.util.Timeout = Constants.FUTURE_TIMEOUT
+
+ implicit def ec: ExecutionContext = system.dispatcher
+
+ protected def doRoute(implicit mat: Materializer): Route
+
+ protected def prefix = Slash ~ "api" / s"$REST_VERSION"
+
+ protected val LOG = LogUtil.getLogger(getClass)
+
+ protected def cache = false
+ private val noCacheHeader = `Cache-Control`(`no-cache`, `max-age`(0L))
+
+ def route: Route = encodeResponse {
+ extractMaterializer { implicit mat =>
+ rawPathPrefix(prefix) {
+ if (cache) {
+ doRoute(mat)
+ } else {
+ respondWithHeader(noCacheHeader) {
+ doRoute(mat)
+ }
+ }
+ }
+ }
+ }
+}