You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:24 UTC
[14/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
new file mode 100644
index 0000000..32c9f08
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import java.io.{File, IOException}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+import java.nio.file.StandardOpenOption.{APPEND, WRITE}
+import scala.collection.JavaConverters._
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
+import akka.http.scaladsl.unmarshalling.Unmarshaller._
+import akka.stream.Materializer
+import com.typesafe.config.Config
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ReadOption}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, WorkerList}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig, SubmitApplicationResultValue}
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.worker.WorkerSummary
+import org.apache.gearpump.cluster.{ClusterConfig, UserConfig}
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription}
+import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication}
+import org.apache.gearpump.util.ActorUtil._
+import org.apache.gearpump.util.FileDirective._
+import org.apache.gearpump.util.{Constants, Graph, Util}
+
+/** Manages service for master node */
+class MasterService(val master: ActorRef,
+ val jarStore: JarStoreService, override val system: ActorSystem)
+ extends BasicService {
+
+ import upickle.default.{read, write}
+
+ private val systemConfig = system.settings.config
+ private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
+
+ protected override def doRoute(implicit mat: Materializer) = pathPrefix("master") {
+ pathEnd {
+ get {
+ onComplete(askActor[MasterData](master, GetMasterData)) {
+ case Success(value: MasterData) => complete(write(value))
+ case Failure(ex) => failWith(ex)
+ }
+ }
+ } ~
+ path("applist") {
+ onComplete(askActor[AppMastersData](master, AppMastersDataRequest)) {
+ case Success(value: AppMastersData) =>
+ complete(write(value))
+ case Failure(ex) => failWith(ex)
+ }
+ } ~
+ path("workerlist") {
+ def future: Future[List[WorkerSummary]] = askActor[WorkerList](master, GetAllWorkers)
+ .flatMap { workerList =>
+ val workers = workerList.workers
+ val workerDataList = List.empty[WorkerSummary]
+
+ Future.fold(workers.map { workerId =>
+ askWorker[WorkerData](master, workerId, GetWorkerData(workerId))
+ })(workerDataList) { (workerDataList, workerData) =>
+ workerDataList :+ workerData.workerDescription
+ }
+ }
+ onComplete(future) {
+ case Success(result: List[WorkerSummary]) => complete(write(result))
+ case Failure(ex) => failWith(ex)
+ }
+ } ~
+ path("config") {
+ onComplete(askActor[MasterConfig](master, QueryMasterConfig)) {
+ case Success(value: MasterConfig) =>
+ val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
+ complete(config)
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ } ~
+ path("metrics" / RestPath) { path =>
+ parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { readOption: String =>
+ val query = QueryHistoryMetrics(path.head.toString, readOption)
+ onComplete(askActor[HistoryMetrics](master, query)) {
+ case Success(value) =>
+ complete(write(value))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+ } ~
+ path("submitapp") {
+ post {
+ uploadFile { form =>
+ val jar = form.getFile("jar").map(_.file)
+ val configFile = form.getFile("configfile").map(_.file)
+ val configString = form.getValue("configstring").getOrElse("")
+ val executorCount = form.getValue("executorcount").getOrElse("1").toInt
+ val args = form.getValue("args").getOrElse("")
+
+ val mergedConfigFile = mergeConfig(configFile, configString)
+
+ onComplete(Future(
+ MasterService.submitGearApp(jar, executorCount, args, systemConfig, mergedConfigFile)
+ )) {
+ case Success(success) =>
+ val response = MasterService.AppSubmissionResult(success)
+ complete(write(response))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+ }
+ } ~
+ path("submitstormapp") {
+ post {
+ uploadFile { form =>
+ val jar = form.getFile("jar").map(_.file)
+ val configFile = form.getFile("configfile").map(_.file)
+ val args = form.getValue("args").getOrElse("")
+ onComplete(Future(
+ MasterService.submitStormApp(jar, configFile, args, systemConfig)
+ )) {
+ case Success(success) =>
+ val response = MasterService.AppSubmissionResult(success)
+ complete(write(response))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+ }
+ } ~
+ path("submitdag") {
+ post {
+ entity(as[String]) { request =>
+ val msg = java.net.URLDecoder.decode(request, "UTF-8")
+ val submitApplicationRequest = read[SubmitApplicationRequest](msg)
+ import submitApplicationRequest.{appName, dag, processors, userconfig}
+ val context = ClientContext(system.settings.config, system, master)
+
+ val graph = dag.mapVertex { processorId =>
+ processors(processorId)
+ }.mapEdge { (node1, edge, node2) =>
+ PartitionerDescription(new PartitionerByClassName(edge))
+ }
+
+ val effectiveConfig = if (userconfig == null) UserConfig.empty else userconfig
+ val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph))
+
+ import upickle.default.write
+ val submitApplicationResultValue = SubmitApplicationResultValue(appId)
+ val jsonData = write(submitApplicationResultValue)
+ complete(jsonData)
+ }
+ }
+ } ~
+ path("uploadjar") {
+ uploadFile { form =>
+ val jar = form.getFile("jar").map(_.file)
+ if (jar.isEmpty) {
+ complete(write(
+ MasterService.Status(success = false, reason = "Jar file not found")))
+ } else {
+ val jarFile = Util.uploadJar(jar.get, jarStore)
+ complete(write(jarFile))
+ }
+ }
+ } ~
+ path("partitioners") {
+ get {
+ complete(write(BuiltinPartitioners(Constants.BUILTIN_PARTITIONERS.map(_.getName))))
+ }
+ }
+ }
+
+ private def mergeConfig(configFile: Option[File], configString: String): Option[File] = {
+ if (configString == null || configString.isEmpty) {
+ configFile
+ } else {
+ configFile match {
+ case Some(file) =>
+ Files.write(file.toPath, ("\n" + configString).getBytes(UTF_8), APPEND)
+ Some(file)
+ case None =>
+ val file = File.createTempFile("\"userfile_configstring_", ".conf")
+ Files.write(file.toPath, configString.getBytes(UTF_8), WRITE)
+ Some(file)
+ }
+ }
+ }
+}
+
+object MasterService {
+
+ case class BuiltinPartitioners(partitioners: Array[String])
+
+ case class AppSubmissionResult(success: Boolean)
+
+ case class Status(success: Boolean, reason: String = null)
+
+ /**
+ * Submits Native Application.
+ */
+ def submitGearApp(
+ jar: Option[File], executorNum: Int, args: String,
+ systemConfig: Config, userConfigFile: Option[File]): Boolean = {
+ submitAndDeleteTempFiles(
+ "org.apache.gearpump.cluster.main.AppSubmitter",
+ argsArray = Array("-executors", executorNum.toString) ++ spaceSeparatedArgumentsToArray(args),
+ fileMap = Map("jar" -> jar).filter(_._2.isDefined).mapValues(_.get),
+ classPath = getUserApplicationClassPath,
+ systemConfig,
+ userConfigFile
+ )
+ }
+
+ /**
+ * Submits Storm application.
+ */
+ def submitStormApp(
+ jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = {
+ submitAndDeleteTempFiles(
+ "org.apache.gearpump.experiments.storm.main.GearpumpStormClient",
+ argsArray = spaceSeparatedArgumentsToArray(args),
+ fileMap = Map("jar" -> jar, "config" -> stormConf).filter(_._2.isDefined).mapValues(_.get),
+ classPath = getStormApplicationClassPath,
+ systemConfig,
+ userConfigFile = None
+ )
+ }
+
+ private def submitAndDeleteTempFiles(
+ mainClass: String, argsArray: Array[String], fileMap: Map[String, File],
+ classPath: Array[String], systemConfig: Config,
+ userConfigFile: Option[File] = None): Boolean = {
+ try {
+ val jar = fileMap.get("jar")
+ if (jar.isEmpty) {
+ throw new IOException("JAR file not supplied")
+ }
+
+ val process = Util.startProcess(
+ clusterOptions(systemConfig, userConfigFile),
+ classPath,
+ mainClass,
+ arguments = createFilePathArgArray(fileMap) ++ argsArray
+ )
+
+ val retval = process.exitValue()
+ if (retval != 0) {
+ throw new IOException(s"Process exit abnormally with exit code $retval.\n" +
+ s"Error message: ${process.logger.error}")
+ }
+ true
+ } finally {
+ fileMap.values.foreach(_.delete)
+ if (userConfigFile.isDefined) {
+ userConfigFile.get.delete()
+ }
+ }
+ }
+
+ /**
+ * Returns Java options for gearpump cluster
+ */
+ private def clusterOptions(systemConfig: Config, userConfigFile: Option[File]): Array[String] = {
+ var options = Array(
+ s"-D${Constants.GEARPUMP_HOME}=${systemConfig.getString(Constants.GEARPUMP_HOME)}",
+ s"-D${Constants.GEARPUMP_HOSTNAME}=${systemConfig.getString(Constants.GEARPUMP_HOSTNAME)}",
+ s"-D${Constants.PREFER_IPV4}=true"
+ )
+
+ val masters = systemConfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+ .toList.flatMap(Util.parseHostList)
+ options ++= masters.zipWithIndex.map { case (master, index) =>
+ s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.$index=${master.host}:${master.port}"
+ }.toArray[String]
+
+ if (userConfigFile.isDefined) {
+ options :+= s"-D${Constants.GEARPUMP_CUSTOM_CONFIG_FILE}=${userConfigFile.get.getPath}"
+ }
+ options
+ }
+
+ /**
+ * Filter all defined file paths and store their config key and path into an array.
+ */
+ private def createFilePathArgArray(fileMap: Map[String, File]): Array[String] = {
+ var args = Array.empty[String]
+ fileMap.foreach({ case (key, path) =>
+ args ++= Array(s"-$key", path.getPath)
+ })
+ args
+ }
+
+ /**
+ * Returns a space separated arguments as an array.
+ */
+ private def spaceSeparatedArgumentsToArray(str: String): Array[String] = {
+ str.split(" +").filter(_.nonEmpty)
+ }
+
+ private val homeDir = System.getProperty(Constants.GEARPUMP_HOME) + "/"
+ private val libHomeDir = homeDir + "lib/"
+
+ private def getUserApplicationClassPath: Array[String] = {
+ Array(
+ homeDir + "conf",
+ libHomeDir + "daemon/*",
+ libHomeDir + "yarn/*",
+ libHomeDir + "*"
+ )
+ }
+
+ private def getStormApplicationClassPath: Array[String] = {
+ getUserApplicationClassPath ++ Array(
+ libHomeDir + "storm/*"
+ )
+ }
+
+ case class SubmitApplicationRequest(
+ appName: String,
+ processors: Map[ProcessorId, ProcessorDescription],
+ dag: Graph[Int, String],
+ userconfig: UserConfig)
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
new file mode 100644
index 0000000..87f9e34
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.http.scaladsl.model.StatusCodes._
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.{Route, _}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import org.apache.commons.lang.exception.ExceptionUtils
+
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.util.{Constants, LogUtil}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/** Contains all REST API service endpoints */
+class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem)
+ extends RouteService {
+
+ implicit val timeout = Constants.FUTURE_TIMEOUT
+
+ private val config = system.settings.config
+
+ private val jarStoreService = JarStoreService.get(config)
+ jarStoreService.init(config, system)
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ private val securityEnabled = config.getBoolean(
+ Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED)
+
+ private val supervisorPath = system.settings.config.getString(
+ Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
+
+ private val myExceptionHandler: ExceptionHandler = ExceptionHandler {
+ case ex: Throwable => {
+ extractUri { uri =>
+ LOG.error(s"Request to $uri could not be handled normally", ex)
+ complete(InternalServerError, ExceptionUtils.getStackTrace(ex))
+ }
+ }
+ }
+
+ // Makes sure staticRoute is the final one, as it will try to lookup resource in local path
+ // if there is no match in previous routes
+ private val static = new StaticService(system, supervisorPath).route
+
+ def supervisor: ActorRef = {
+ if (supervisorPath == null || supervisorPath.isEmpty()) {
+ null
+ } else {
+ val actorRef = system.actorSelection(supervisorPath).resolveOne()
+ Await.result(actorRef, new Timeout(Duration.create(5, "seconds")).duration)
+ }
+ }
+
+ override def route: Route = {
+ if (securityEnabled) {
+ val security = new SecurityService(services, system)
+ handleExceptions(myExceptionHandler) {
+ security.route ~ static
+ }
+ } else {
+ handleExceptions(myExceptionHandler) {
+ services.route ~ static
+ }
+ }
+ }
+
+ private def services: RouteService = {
+
+ val admin = new AdminService(system)
+ val masterService = new MasterService(master, jarStoreService, system)
+ val worker = new WorkerService(master, system)
+ val app = new AppMasterService(master, jarStoreService, system)
+ val sup = new SupervisorService(master, supervisor, system)
+
+ new RouteService {
+ override def route: Route = {
+ admin.route ~ sup.route ~ masterService.route ~ worker.route ~ app.route
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
new file mode 100644
index 0000000..804b34f
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.headers.{HttpChallenge, HttpCookie, HttpCookiePair}
+import akka.http.scaladsl.model.{RemoteAddress, StatusCodes, Uri}
+import akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsMissing, CredentialsRejected}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._
+import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet
+import akka.stream.Materializer
+import com.typesafe.config.Config
+import com.softwaremill.session.SessionDirectives._
+import com.softwaremill.session.SessionOptions._
+import com.softwaremill.session.{MultiValueSessionSerializer, SessionConfig, SessionManager}
+import upickle.default.write
+
+import org.apache.gearpump.security.{Authenticator => BaseAuthenticator}
+import org.apache.gearpump.services.SecurityService.{User, UserSession}
+import org.apache.gearpump.services.security.oauth2.OAuth2Authenticator
+import org.apache.gearpump.util.{Constants, LogUtil}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/**
+ * Security authentication endpoint.
+ *
+ * - When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection
+ * - When user can be authenticated, but are not authorized to access certail resource, will
+ * return a 405 AuthorizationFailedRejection.
+ * - When web UI frontend receive 401, it should redirect the UI to login page.
+ * - When web UI receive 405,it should display errors like
+ * "current user is not authorized to access this resource."
+ *
+ * The Authenticator used is pluggable, the current Authenticator is resolved by looking up
+ * config path [[org.apache.gearpump.util.Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]].
+ *
+ * See [[org.apache.gearpump.security.Authenticator]] to find more info on custom Authenticator.
+ */
+class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService {
+
+ // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box.
+ private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump",
+ params = Map.empty)
+
+ val LOG = LogUtil.getLogger(getClass, "AUDIT")
+
+ private val config = system.settings.config
+ private val sessionConfig = SessionConfig.fromConfig(config)
+ private implicit val sessionManager: SessionManager[UserSession] =
+ new SessionManager[UserSession](sessionConfig)
+
+ private val authenticator = {
+ val clazz = Class.forName(config.getString(Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS))
+ val constructor = clazz.getConstructor(classOf[Config])
+ val authenticator = constructor.newInstance(config).asInstanceOf[BaseAuthenticator]
+ authenticator
+ }
+
+ private def configToMap(config: Config, path: String) = {
+ import scala.collection.JavaConverters._
+ config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
+ }
+
+ private val oauth2Providers: Map[String, String] = {
+ if (config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)) {
+ val map = configToMap(config, Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS)
+ map.keys.toList.map { key =>
+ val iconPath = config.getString(s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$key.icon")
+ (key, iconPath)
+ }.toMap
+ } else {
+ Map.empty[String, String]
+ }
+ }
+
+ private def authenticate(user: String, pass: String)(implicit ec: ExecutionContext)
+ : Future[Option[UserSession]] = {
+ authenticator.authenticate(user, pass, ec).map { result =>
+ if (result.authenticated) {
+ Some(UserSession(user, result.permissionLevel))
+ } else {
+ None
+ }
+ }
+ }
+
+ private def rejectMissingCredentials: Route = {
+ reject(AuthenticationFailedRejection(CredentialsMissing, challenge))
+ }
+
+ private def rejectWrongCredentials: Route = {
+ reject(AuthenticationFailedRejection(CredentialsRejected, challenge))
+ }
+
+ private def requireAuthentication(inner: UserSession => Route): Route = {
+ optionalSession(oneOff, usingCookiesOrHeaders) { sessionOption =>
+ sessionOption match {
+ case Some(session) => {
+ inner(session)
+ }
+ case None =>
+ rejectMissingCredentials
+ }
+ }
+ }
+
+ private def login(session: UserSession, ip: String, redirectToRoot: Boolean = false): Route = {
+ setSession(oneOff, usingCookies, session) {
+ val user = session.user
+ // Default: 1 day
+ val maxAgeMs = 1000 * sessionConfig.sessionMaxAgeSeconds.getOrElse(24 * 3600L)
+ setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = Some("/"),
+ maxAge = Some(maxAgeMs))) {
+ LOG.info(s"user $user login from $ip")
+ if (redirectToRoot) {
+ redirect(Uri("/"), StatusCodes.TemporaryRedirect)
+ } else {
+ complete(write(new User(user)))
+ }
+ }
+ }
+ }
+
+ private def logout(user: UserSession, ip: String): Route = {
+ invalidateSession(oneOff, usingCookies) { ctx =>
+ LOG.info(s"user ${user.user} logout from $ip")
+ ctx.complete(write(new User(user.user)))
+ }
+ }
+
+ // Only admin are able to access operation like post/delete/put
+ private def requireAuthorization(user: UserSession, route: => Route): Route = {
+ // Valid user
+ if (user.permissionLevel >= BaseAuthenticator.User.permissionLevel) {
+ route
+ } else {
+ // Possibly a guest or not authenticated.
+ (put | delete | post) {
+ // Reject with 405 authorization error
+ reject(AuthorizationFailedRejection)
+ } ~
+ get {
+ route
+ }
+ }
+ }
+
+ private val unknownIp: Directive1[RemoteAddress] = {
+ Directive[Tuple1[RemoteAddress]]{ inner =>
+ inner(new Tuple1(RemoteAddress.Unknown))
+ }
+ }
+
+ override val route: Route = {
+
+ extractExecutionContext{implicit ec: ExecutionContext =>
+ extractMaterializer{implicit mat: Materializer =>
+ (extractClientIP | unknownIp) { ip =>
+ pathPrefix("login") {
+ pathEndOrSingleSlash {
+ get {
+ getFromResource("login/login.html")
+ } ~
+ post {
+ // Guest account don't have permission to submit new application in UI
+ formField(FieldMagnet('username.as[String])) {user: String =>
+ formFields(FieldMagnet('password.as[String])) {pass: String =>
+ val result = authenticate(user, pass)
+ onSuccess(result) {
+ case Some(session) =>
+ login(session, ip.toString)
+ case None =>
+ rejectWrongCredentials
+ }
+ }
+ }
+ }
+ } ~
+ path ("oauth2" / "providers") {
+ // Responds with a list of OAuth2 providers.
+ complete(write(oauth2Providers))
+ } ~
+ // Support OAUTH Authentication
+ pathPrefix ("oauth2"/ Segment) {providerName =>
+ // Resolve OAUTH Authentication Provider
+ val oauthService = OAuth2Authenticator.get(config, providerName, ec)
+
+ if (oauthService == null) {
+ // OAuth2 is disabled.
+ complete(StatusCodes.NotFound)
+ } else {
+
+ def loginWithOAuth2Parameters(parameters: Map[String, String]): Route = {
+ val result = oauthService.authenticate(parameters)
+ onComplete(result) {
+ case Success(session) =>
+ login(session, ip.toString, redirectToRoot = true)
+ case Failure(ex) => {
+ LOG.info(s"Failed to login user from ${ip.toString}", ex)
+ rejectWrongCredentials
+ }
+ }
+ }
+
+ path ("authorize") {
+ // Redirects to OAuth2 service provider for authorization.
+ redirect(Uri(oauthService.getAuthorizationUrl), StatusCodes.TemporaryRedirect)
+ } ~
+ path ("accesstoken") {
+ post {
+ // Guest account don't have permission to submit new application in UI
+ formField(FieldMagnet('accesstoken.as[String])) {accesstoken: String =>
+ loginWithOAuth2Parameters(Map("accesstoken" -> accesstoken))
+ }
+ }
+ } ~
+ path("callback") {
+ // Login with authorization code or access token.
+ parameterMap {parameters =>
+ loginWithOAuth2Parameters(parameters)
+ }
+ }
+ }
+ }
+ } ~
+ path("logout") {
+ post {
+ requireAuthentication {session =>
+ logout(session, ip.toString())
+ }
+ }
+ } ~
+ requireAuthentication {user =>
+ requireAuthorization(user, inner.route)
+ }
+ }}}
+
+ }
+}
+
+object SecurityService {
+
+ val SESSION_MANAGER_KEY = "akka.http.session.server-secret"
+
+ case class UserSession(user: String, permissionLevel: Int)
+
+ object UserSession {
+
+ private val User = "user"
+ private val PermissionLevel = "permissionLevel"
+
+ implicit def serializer: MultiValueSessionSerializer[UserSession] = {
+ new MultiValueSessionSerializer[UserSession](
+ toMap = {t: UserSession =>
+ Map(User -> t.user, PermissionLevel -> t.permissionLevel.toString)
+ },
+ fromMap = {m: Map[String, String] =>
+ if (m.contains(User)) {
+ Try(UserSession(m(User), m(PermissionLevel).toInt))
+ } else {
+ Failure[UserSession](new Exception("Fail to parse session "))
+ }
+ }
+ )
+ }
+ }
+
+ case class User(user: String)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
new file mode 100644
index 0000000..284d3f2
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.stream.Materializer
+
+import org.apache.gearpump.util.Util
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/**
+ * static resource files.
+ */
+class StaticService(override val system: ActorSystem, supervisorPath: String)
+ extends BasicService {
+
+ private val version = Util.version
+
+ protected override def prefix = Neutral
+
+ override def cache: Boolean = true
+
+ protected override def doRoute(implicit mat: Materializer) = {
+ path("version") {
+ get { ctx =>
+ ctx.complete(version)
+ }
+ } ~
+ // For YARN usage, we need to make sure supervisor-path
+ // can be accessed without authentication.
+ path("supervisor-actor-path") {
+ get {
+ complete(supervisorPath)
+ }
+ } ~
+ pathEndOrSingleSlash {
+ getFromResource("index.html")
+ } ~
+ path("favicon.ico") {
+ complete(StatusCodes.NotFound)
+ } ~
+ pathPrefix("webjars") {
+ get {
+ getFromResourceDirectory("META-INF/resources/webjars")
+ }
+ } ~
+ path(Rest) { path =>
+ getFromResource("%s" format path)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala
new file mode 100644
index 0000000..fecf5ad
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import akka.stream.Materializer
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.services.SupervisorService.{Path, Status}
+import org.apache.gearpump.util.ActorUtil._
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/** Responsible for adding/removing machines. Typically it delegates to YARN. */
+class SupervisorService(
+ val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem)
+ extends BasicService {
+
+ import upickle.default.write
+
+ /**
+ * TODO: Add additional check to ensure the user have enough authorization to
+ * add/remove a worker machine
+ */
+ private def authorize(internal: Route): Route = {
+ if (supervisor == null) {
+ failWith(new Exception("API not enabled, cannot find a valid supervisor! " +
+ "Please make sure Gearpump is running on top of YARN or other resource managers"))
+ } else {
+ internal
+ }
+ }
+
+ protected override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") {
+ pathEnd {
+ get {
+ val path = if (supervisor == null) {
+ null
+ } else {
+ supervisor.path.toString
+ }
+ complete(write(Path(path)))
+ }
+ } ~
+ path("status") {
+ post {
+ if (supervisor == null) {
+ complete(write(Status(enabled = false)))
+ } else {
+ complete(write(Status(enabled = true)))
+ }
+ }
+ } ~
+ path("addworker" / IntNumber) { workerCount =>
+ post {
+ authorize {
+ onComplete(askActor[CommandResult](supervisor, AddWorker(workerCount))) {
+ case Success(value) =>
+ complete(write(value))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+ }
+ } ~
+ path("removeworker" / Segment) { workerIdString =>
+ post {
+ authorize {
+ val workerId = WorkerId.parse(workerIdString)
+ def future(): Future[CommandResult] = {
+ askWorker[WorkerData](master, workerId, GetWorkerData(workerId)).flatMap{workerData =>
+ val containerId = workerData.workerDescription.resourceManagerContainerId
+ askActor[CommandResult](supervisor, RemoveWorker(containerId))
+ }
+ }
+
+ onComplete[CommandResult](future()) {
+ case Success(value) =>
+ complete(write(value))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+ }
+ }
+ }
+}
+
+object SupervisorService {
+ case class Status(enabled: Boolean)
+
+ case class Path(path: String)
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
new file mode 100644
index 0000000..8268d61
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.util.{Failure, Success}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.http.scaladsl.server.Directives._
+import akka.stream.Materializer
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ReadOption}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.ActorUtil._
+import org.apache.gearpump.util.Constants
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/** Service to handle worker related queries */
+class WorkerService(val master: ActorRef, override val system: ActorSystem)
+ extends BasicService {
+
+ import upickle.default.write
+ private val systemConfig = system.settings.config
+ private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
+
+ protected override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) {
+ workerIdString => {
+ pathEnd {
+ val workerId = WorkerId.parse(workerIdString)
+ onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) {
+ case Success(value: WorkerData) =>
+ complete(write(value.workerDescription))
+ case Failure(ex) => failWith(ex)
+ }
+ }
+ }~
+ path("config") {
+ val workerId = WorkerId.parse(workerIdString)
+ onComplete(askWorker[WorkerConfig](master, workerId, QueryWorkerConfig(workerId))) {
+ case Success(value: WorkerConfig) =>
+ val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
+ complete(config)
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ } ~
+ path("metrics" / RestPath ) { path =>
+ val workerId = WorkerId.parse(workerIdString)
+ parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
+ val query = QueryHistoryMetrics(path.head.toString, readOption)
+ onComplete(askWorker[HistoryMetrics](master, workerId, query)) {
+ case Success(value) =>
+ complete(write(value))
+ case Failure(ex) =>
+ failWith(ex)
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala
new file mode 100644
index 0000000..23c95c4
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.main
+
+import java.util.Random
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.server.Route
+import akka.stream.ActorMaterializer
+import com.typesafe.config.ConfigValueFactory
+import org.slf4j.Logger
+import sun.misc.BASE64Encoder
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, Gear}
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.services.{RestServices, SecurityService}
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+
+/** Command line to start UI server */
+object Services extends AkkaApp with ArgumentsParser {
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ override val options: Array[(String, CLIOption[Any])] = Array(
+ "master" -> CLIOption("<host:port>", required = false),
+ Gear.OPTION_CONFIG -> CLIOption("<provide a custom configuration file>", required = false),
+ "supervisor" -> CLIOption("<Supervisor Actor Path>", required = false, Some("")))
+
+ override val description = "UI Server"
+
+ override def akkaConfig: Config = {
+ ClusterConfig.ui()
+ }
+
+ override def help(): Unit = {
+ // scalastyle:off println
+ Console.println("UI Server")
+ // scalastyle:on println
+ }
+
+ private var killFunction: Option[() => Unit] = None
+
+ override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+
+ val argConfig = parse(args)
+ var akkaConf =
+ if (argConfig.exists(Gear.OPTION_CONFIG)) {
+ ClusterConfig.ui(argConfig.getString(Gear.OPTION_CONFIG))
+ } else {
+ inputAkkaConf
+ }
+
+ val LOG: Logger = {
+ LogUtil.loadConfiguration(akkaConf, ProcessType.UI)
+ LogUtil.getLogger(getClass)
+ }
+
+ if (argConfig.exists("master")) {
+ val master = argConfig.getString("master")
+ akkaConf = akkaConf.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
+ ConfigValueFactory.fromIterable(List(master).asJava))
+ }
+
+ akkaConf = akkaConf.withValue(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH,
+ ConfigValueFactory.fromAnyRef(argConfig.getString("supervisor")))
+ // Creates a random unique secret key for session manager.
+ // All previous stored session token cookies will be invalidated when UI
+ // server is restarted.
+ .withValue(SecurityService.SESSION_MANAGER_KEY,
+ ConfigValueFactory.fromAnyRef(randomSeverSecret()))
+
+ val masterCluster = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+ .flatMap(Util.parseHostList)
+
+ implicit val system = ActorSystem("services", akkaConf)
+ implicit val executionContext = system.dispatcher
+
+ import scala.concurrent.duration._
+ val master = system.actorOf(MasterProxy.props(masterCluster, 1.day),
+ s"masterproxy${system.name}")
+ val (host, port) = parseHostPort(system.settings.config)
+
+ implicit val mat = ActorMaterializer()
+ val services = new RestServices(master, mat, system)
+
+ val bindFuture = Http().bindAndHandle(Route.handlerFlow(services.route), host, port)
+ Await.result(bindFuture, 15.seconds)
+
+ val displayHost = if (host == "0.0.0.0") "127.0.0.1" else host
+ LOG.info(s"Please browse to http://$displayHost:$port to see the web UI")
+
+ // scalastyle:off println
+ println(s"Please browse to http://$displayHost:$port to see the web UI")
+ // scalastyle:on println
+
+ killFunction = Some { () =>
+ LOG.info("Shutting down UI Server")
+ system.terminate()
+ }
+
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+
+ private def randomSeverSecret(): String = {
+ val random = new Random()
+ val length = 64 // Required
+ val bytes = new Array[Byte](length)
+ random.nextBytes(bytes)
+ val endecoder = new BASE64Encoder()
+ endecoder.encode(bytes)
+ }
+
+ private def parseHostPort(config: Config): (String, Int) = {
+ val port = config.getInt(Constants.GEARPUMP_SERVICE_HTTP)
+ val host = config.getString(Constants.GEARPUMP_SERVICE_HOST)
+ (host, port)
+ }
+
+ // TODO: fix this
+ // Hacks around for YARN module, so that we can kill the UI server
+ // when application is shutting down.
+ def kill(): Unit = {
+ if (killFunction.isDefined) {
+ killFunction.get.apply()
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala
new file mode 100644
index 0000000..12dec1d
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump
+
+package object services {
+ final val REST_VERSION = "v1.0"
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala
new file mode 100644
index 0000000..e9008aa
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.services.SecurityService.UserSession
+import org.apache.gearpump.util.Constants
+import org.apache.gearpump.util.Constants._
+
+/**
+ *
+ * Uses OAuth2 social-login as the mechanism for authentication.
+ * @see [[https://tools.ietf.org/html/rfc6749]] to find what is OAuth2, and how it works.
+ *
+ * Basically flow for OAuth2 Authentication:
+ * 1. User accesses Gearpump UI website, and choose to login with OAuth2 server.
+ * 2. Gearpump UI website redirects user to OAuth2 server domain authorization endpoint.
+ * 3. End user complete the authorization in the domain of OAuth2 server.
+ * 4. OAuth2 server redirects user back to Gearpump UI server.
+ * 5. Gearpump UI server verify the tokens and extract credentials from query
+ * parameters and form fields.
+ *
+ * NOTE: '''Thread-safety''' is a MUST requirement. Developer need to ensure the sub-class is
+ * thread-safe. Sub-class should have a parameterless constructor.
+ *
+ * NOTE: OAuth2 Authenticator requires access of Internet. Please make sure HTTP proxy are
+ * set properly if applied.
+ *
+ * Example: Config proxy when UI server is started on Windows:
+ * {{{
+ * > set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com
+ * -Dhttps.proxyPort=8088
+ * > bin\services
+ * }}}
+ *
+ * Example: Config proxy when UI server is started on Linux:
+ * {{{
+ * $ export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com
+ * -Dhttps.proxyPort=8088"
+ * $ bin/services
+ * }}}
+ */
+trait OAuth2Authenticator {
+
+ /**
+ * Inits authenticator with config which contains client ID, client secret, and etc..
+ *
+ * Typically, the client key and client secret is provided by OAuth2 Authorization server
+ * when user register an application there.
+ * See [[https://tools.ietf.org/html/rfc6749]] for definition of client, client Id,
+ * and client secret.
+ *
+ * See [[https://developer.github.com/v3/oauth/]] for an actual example of how Github
+ * use client key, and client secret.
+ *
+ * NOTE: '''Thread-Safety''': Framework ensures this call is synchronized.
+ *
+ * @param config Client Id, client secret, callback URL and etc..
+ * @param executionContext ExecutionContext from hosting environment.
+ */
+ def init(config: Config, executionContext: ExecutionContext): Unit
+
+ /**
+ * Returns the OAuth Authorization URL so for redirection to that address to do OAuth2
+ * authorization.
+ *
+ * NOTE: '''Thread-Safety''': This can be called in a multi-thread environment. Developer
+ * need to ensure thread safety.
+ */
+ def getAuthorizationUrl: String
+
+ /**
+ * After authorization, OAuth2 server redirects user back with tokens. This verify the
+ * tokens, retrieve the profiles, and return [[UserSession]] information.
+ *
+ * NOTE: This is an Async call.
+ *
+ * NOTE: This call requires external internet access.
+ *
+ * NOTE: '''Thread-Safety''': This can be called in a multi-thread environment. Developer
+ * need to ensure thread safety.
+ *
+ * @param parameters HTTP Query and Post parameters, which typically contains Authorization code.
+ * @return UserSession if authentication pass.
+ */
+ def authenticate(parameters: Map[String, String]): Future[UserSession]
+
+ /**
+ * Clean up resource
+ */
+ def close(): Unit
+}
+
+object OAuth2Authenticator {
+
+ // Serves as a quick immutable lookup cache
+ private var providers = Map.empty[String, OAuth2Authenticator]
+
+ /**
+ * Load Authenticator from [[Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS]]
+ *
+ * @param provider, Name for the OAuth2 Authentication Service.
+ * @return Returns null if the OAuth2 Authentication is disabled.
+ */
+ def get(config: Config, provider: String, executionContext: ExecutionContext)
+ : OAuth2Authenticator = {
+
+ if (providers.contains(provider)) {
+ providers(provider)
+ } else {
+ val path = s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$provider"
+ val enabled = config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)
+ if (enabled && config.hasPath(path)) {
+ this.synchronized {
+ if (providers.contains(provider)) {
+ providers(provider)
+ } else {
+ val authenticatorConfig = config.getConfig(path)
+ val authenticatorClass = authenticatorConfig.getString(
+ GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
+ val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass)
+ val authenticator = clazz.newInstance().asInstanceOf[OAuth2Authenticator]
+ authenticator.init(authenticatorConfig, executionContext)
+ providers += provider -> authenticator
+ authenticator
+ }
+ }
+ } else {
+ null
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
new file mode 100644
index 0000000..603fb1d
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2.impl
+
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.mutable.StringBuilder
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
+
+import com.typesafe.config.Config
+import com.github.scribejava.core.builder.ServiceBuilderAsync
+import com.github.scribejava.core.builder.api.DefaultApi20
+import com.github.scribejava.core.model._
+import com.github.scribejava.core.oauth.OAuth20Service
+import com.github.scribejava.core.utils.OAuthEncoder
+import com.ning.http.client.AsyncHttpClientConfig
+
+import org.apache.gearpump.security.Authenticator
+import org.apache.gearpump.services.SecurityService.UserSession
+import org.apache.gearpump.services.security.oauth2.OAuth2Authenticator
+import org.apache.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.Util
+
+/**
+ * Uses Ning AsyncClient to connect to OAuth2 service.
+ *
+ * See [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]]
+ * for more API information.
+ */
+abstract class BaseOAuth2Authenticator extends OAuth2Authenticator {
+
+ // Authorize Url for end user to authorize
+ protected def authorizeUrl: String
+
+ // Used to fetch the Access Token.
+ protected def accessTokenEndpoint: String
+
+ // Protected resource Url to get the user profile
+ protected def protectedResourceUrl: String
+
+ // Extracts the username information from response of protectedResourceUrl
+ protected def extractUserName(body: String): String
+
+ // Scope required to access protectedResourceUrl
+ protected def scope: String
+
+ // OAuth2 endpoint definition for ScribeJava.
+ protected def oauth2Api(): DefaultApi20 = {
+ new BaseApi20(authorizeUrl, accessTokenEndpoint)
+ }
+
+ protected var oauthService: OAuth20Service = null
+
+ protected var executionContext: ExecutionContext = null
+
+ private var defaultPermissionLevel = Authenticator.Guest.permissionLevel
+
+ // Synchronization ensured by the caller
+ override def init(config: Config, executionContext: ExecutionContext): Unit = {
+ if (this.oauthService == null) {
+ val callback = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK)
+ val clientId = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID)
+ val clientSecret = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET)
+ defaultPermissionLevel = {
+ val role = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE)
+ role match {
+ case "guest" => Authenticator.Guest.permissionLevel
+ case "user" => Authenticator.User.permissionLevel
+ case "admin" => Authenticator.Admin.permissionLevel
+ case _ => Authenticator.UnAuthenticated.permissionLevel
+ }
+ }
+ this.oauthService = buildOAuth2Service(clientId, clientSecret, callback)
+ this.executionContext = executionContext
+ }
+ }
+
+ private val isClosed: AtomicBoolean = new AtomicBoolean(false)
+
+ override def close(): Unit = {
+ if (isClosed.compareAndSet(false, true)) {
+ if (null != oauthService && null != oauthService.getAsyncHttpClient()) {
+ oauthService.getAsyncHttpClient().close()
+ }
+ }
+ }
+
+ override def getAuthorizationUrl(): String = {
+ oauthService.getAuthorizationUrl()
+ }
+
+ protected def authenticateWithAccessToken(accessToken: OAuth2AccessToken): Future[UserSession] = {
+ val promise = Promise[UserSession]()
+ val request = new OAuthRequestAsync(Verb.GET, protectedResourceUrl, oauthService)
+ oauthService.signRequest(accessToken, request)
+ request.sendAsync {
+ new OAuthAsyncRequestCallback[Response] {
+ override def onCompleted(response: Response): Unit = {
+ try {
+ val user = extractUserName(response.getBody)
+ promise.success(new UserSession(user, defaultPermissionLevel))
+ } catch {
+ case ex: Throwable =>
+ promise.failure(ex)
+ }
+ }
+
+ override def onThrowable(throwable: Throwable): Unit = {
+ promise.failure(throwable)
+ }
+ }
+ }
+ promise.future
+ }
+
+ protected def authenticateWithAuthorizationCode(code: String): Future[UserSession] = {
+
+ implicit val ec: ExecutionContext = executionContext
+
+ val promise = Promise[UserSession]()
+ oauthService.getAccessTokenAsync(code,
+
+ new OAuthAsyncRequestCallback[OAuth2AccessToken] {
+ override def onCompleted(accessToken: OAuth2AccessToken): Unit = {
+ authenticateWithAccessToken(accessToken).onComplete {
+ case Success(user) => promise.success(user)
+ case Failure(ex) => promise.failure(ex)
+ }
+ }
+
+ override def onThrowable(throwable: Throwable): Unit = {
+ promise.failure(throwable)
+ }
+ })
+ promise.future
+ }
+
+ override def authenticate(parameters: Map[String, String]): Future[UserSession] = {
+
+ val code = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE)
+ val accessToken = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN)
+
+ if (accessToken.isDefined) {
+ authenticateWithAccessToken(new OAuth2AccessToken(accessToken.get))
+ } else if (code.isDefined) {
+ authenticateWithAuthorizationCode(code.get)
+ } else {
+ // Fails authentication if code not exist
+ Future.failed(new Exception("Fail to authenticate user as there is no code parameter in URL"))
+ }
+ }
+
+ private def buildOAuth2Service(clientId: String, clientSecret: String, callback: String)
+ : OAuth20Service = {
+ val state: String = "state" + Util.randInt()
+ ScribeJavaConfig.setForceTypeOfHttpRequests(
+ ForceTypeOfHttpRequest.FORCE_ASYNC_ONLY_HTTP_REQUESTS)
+ val clientConfig: AsyncHttpClientConfig = new AsyncHttpClientConfig.Builder()
+ .setMaxConnections(5)
+ .setUseProxyProperties(true)
+ .setRequestTimeout(60000)
+ .setAllowPoolingConnections(false)
+ .setPooledConnectionIdleTimeout(60000)
+ .setReadTimeout(60000).build
+
+ val service: OAuth20Service = new ServiceBuilderAsync()
+ .apiKey(clientId)
+ .apiSecret(clientSecret)
+ .scope(scope)
+ .state(state)
+ .callback(callback)
+ .asyncHttpClientConfig(clientConfig)
+ .build(oauth2Api())
+
+ service
+ }
+}
+
+object BaseOAuth2Authenticator {
+
+ class BaseApi20(authorizeUrl: String, accessTokenEndpoint: String) extends DefaultApi20 {
+ def getAccessTokenEndpoint: String = {
+ accessTokenEndpoint
+ }
+
+ def getAuthorizationUrl(config: OAuthConfig): String = {
+ val sb: StringBuilder = new StringBuilder(String.format(authorizeUrl,
+ config.getResponseType, config.getApiKey, OAuthEncoder.encode(config.getCallback),
+ OAuthEncoder.encode(config.getScope)))
+ val state: String = config.getState
+ if (state != null) {
+ sb.append('&').append(OAuthConstants.STATE).append('=').append(OAuthEncoder.encode(state))
+ }
+ sb.toString
+ }
+
+ override def createService(config: OAuthConfig): OAuth20Service = {
+ new OAuth20Service(this, config) {
+
+ protected override def createAccessTokenRequest[T <: AbstractRequest](
+ code: String, request: T): T = {
+ super.createAccessTokenRequest(code, request)
+
+ if (!getConfig.hasGrantType) {
+ request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE)
+ }
+
+ // Work-around for issue https://github.com/scribejava/scribejava/issues/641
+ request.addHeader("Content-Type", "application/x-www-form-urlencoded")
+ request
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
new file mode 100644
index 0000000..ded50a7
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2.impl
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+import com.typesafe.config.Config
+import com.github.scribejava.core.builder.api.DefaultApi20
+import com.github.scribejava.core.model._
+import com.github.scribejava.core.oauth.OAuth20Service
+import com.ning.http.client
+import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient}
+import spray.json.{JsString, _}
+import sun.misc.BASE64Encoder
+
+import org.apache.gearpump.services.SecurityService.UserSession
+import org.apache.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20
+import org.apache.gearpump.util.Constants._
+
+/**
+ *
+ * Does authentication with CloudFoundry UAA service. Currently it only
+ * extract the email address of end user.
+ *
+ * For what is UAA,
+ * See [[https://github.com/cloudfoundry/uaa for information about CloudFoundry UAA]]
+ * (User Account and Authentication Service)
+ *
+ * Pre-requisite steps to use this Authenticator:
+ *
+ * Step1: Register your website to UAA with tool uaac.
+ * 1) Check tutorial on uaac at
+ * [[https://docs.cloudfoundry.org/adminguide/uaa-user-management.html]]
+ *
+ * 2) Open a bash shell, set the UAA server by command `uaac target`
+ * {{{
+ * uaac target [your uaa server url]
+ * }}}
+ *
+ * NOTE: [your uaa server url] should match the uaahost settings in gear.conf
+ *
+ * 3) Login in as user admin by
+ * {{{
+ * uaac token client get admin -s MyAdminPassword
+ * }}}
+ *
+ * 4) Create a new Application (Client) in UAA,
+ * {{{
+ * uaac client add [your_client_id]
+ * --scope "openid cloud_controller.read"
+ * --authorized_grant_types "authorization_code client_credentials refresh_token"
+ * --authorities "openid cloud_controller.read"
+ * --redirect_uri [your_redirect_url]
+ * --autoapprove true
+ * --secret [your_client_secret]
+ * }}}
+ *
+ * Step2: Configure the OAuth2 information in gear.conf
+ *
+ * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled"
+ * as true.
+ *
+ * 2) Navigate to section "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa"
+ *
+ * 3) Config gear.conf "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" section.
+ * Please make sure class name, client ID, client Secret, and callback URL are set properly.
+ *
+ * NOTE: The callback URL here should match what you set on CloudFoundry UAA in step1.
+ *
+ * Step3: Restart the UI service and try the "social login" button for UAA.
+ *
+ * NOTE: OAuth requires Internet access, @see
+ * [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] to find tutorials to
+ * configure Internet proxy.
+ *
+ * See [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] for more background
+ * information of OAuth2.
+ */
+class CloudFoundryUAAOAuth2Authenticator extends BaseOAuth2Authenticator {
+
+ import org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator._
+
+ private var host: String = null
+
+ protected override def authorizeUrl: String =
+ s"$host/oauth/authorize?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s"
+
+ protected override def accessTokenEndpoint: String = s"$host/oauth/token"
+
+ protected override def protectedResourceUrl: String = s"$host/userinfo"
+
+ protected override def scope: String = "openid,cloud_controller.read"
+
+ private var additionalAuthenticator: Option[AdditionalAuthenticator] = None
+
+ override def init(config: Config, executionContext: ExecutionContext): Unit = {
+ host = config.getString("uaahost")
+ super.init(config, executionContext)
+
+ if (config.getBoolean(ADDITIONAL_AUTHENTICATOR_ENABLED)) {
+ val additionalAuthenticatorConfig = config.getConfig(ADDITIONAL_AUTHENTICATOR)
+ val authenticatorClass = additionalAuthenticatorConfig
+ .getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
+ val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass)
+ val authenticator = clazz.newInstance().asInstanceOf[AdditionalAuthenticator]
+ authenticator.init(additionalAuthenticatorConfig, executionContext)
+ additionalAuthenticator = Option(authenticator)
+ }
+ }
+
+ protected override def extractUserName(body: String): String = {
+ val email = body.parseJson.asJsObject.fields("email").asInstanceOf[JsString]
+ email.value
+ }
+
+ protected override def oauth2Api(): DefaultApi20 = {
+ new CloudFoundryUAAService(authorizeUrl, accessTokenEndpoint)
+ }
+
+ protected override def authenticateWithAccessToken(accessToken: OAuth2AccessToken)
+ : Future[UserSession] = {
+
+ implicit val ec: ExecutionContext = executionContext
+
+ if (additionalAuthenticator.isDefined) {
+ super.authenticateWithAccessToken(accessToken).flatMap { user =>
+ additionalAuthenticator.get.authenticate(oauthService.getAsyncHttpClient, accessToken, user)
+ }
+ } else {
+ super.authenticateWithAccessToken(accessToken)
+ }
+ }
+}
+
+object CloudFoundryUAAOAuth2Authenticator {
+ private val RESPONSE_TYPE = "response_type"
+
+ val ADDITIONAL_AUTHENTICATOR_ENABLED = "additional-authenticator-enabled"
+ val ADDITIONAL_AUTHENTICATOR = "additional-authenticator"
+
+ private class CloudFoundryUAAService(authorizeUrl: String, accessTokenEndpoint: String)
+ extends BaseApi20(authorizeUrl, accessTokenEndpoint) {
+
+ private def base64(in: String): String = {
+ val encoder = new BASE64Encoder()
+ val utf8 = "UTF-8"
+ encoder.encode(in.getBytes(utf8))
+ }
+
+ override def createService(config: OAuthConfig): OAuth20Service = {
+ new OAuth20Service(this, config) {
+
+ protected override def createAccessTokenRequest[T <: AbstractRequest](
+ code: String, request: T): T = {
+ val config: OAuthConfig = getConfig()
+
+ request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE)
+ request.addParameter(OAuthConstants.CODE, code)
+ request.addParameter(RESPONSE_TYPE, "token")
+ request.addParameter(OAuthConstants.REDIRECT_URI, config.getCallback)
+
+ // Work around issue https://github.com/scribejava/scribejava/issues/641
+ request.addHeader("Content-Type", "application/x-www-form-urlencoded")
+
+ // CloudFoundry requires a Authorization header encoded with client Id and secret.
+ val authorizationHeader = "Basic " + base64(config.getApiKey + ":" + config.getApiSecret)
+ request.addHeader("Authorization", authorizationHeader)
+ request
+ }
+ }
+ }
+ }
+
+ /**
+ * Additional authenticator to check more credential attributes of user before logging in.
+ * This authenticator is applied AFTER user pass the initial (default) authenticator.
+ */
+ trait AdditionalAuthenticator {
+
+ /**
+ * Initialization
+ *
+ * @param config Configurations specifically used for this authenticator.
+ * @param executionContext Execution Context to use to run futures.
+ */
+ def init(config: Config, executionContext: ExecutionContext): Unit
+
+ /**
+ *
+ * @param accessToken, the accessToken for the UAA
+ * @param user user session returned by previous authenticator
+ * @return an updated UserSession
+ */
+ def authenticate(
+ asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, user: UserSession)
+ : Future[UserSession]
+ }
+
+ val ORGANIZATION_URL = "organization-url"
+
+ class OrganizationAccessChecker extends AdditionalAuthenticator {
+ private var organizationUrl: String = null
+ private implicit var executionContext: ExecutionContext = null
+
+ override def init(config: Config, executionContext: ExecutionContext): Unit = {
+ this.organizationUrl = config.getString(ORGANIZATION_URL)
+ this.executionContext = executionContext
+ }
+
+ override def authenticate(asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken,
+ user: UserSession): Future[UserSession] = {
+
+ val promise = Promise[UserSession]()
+ val builder = asyncClient.prepareGet(organizationUrl)
+ builder.addHeader("Authorization", s"bearer ${accessToken.getAccessToken}")
+ builder.execute(new AsyncCompletionHandler[Unit] {
+ override def onCompleted(response: client.Response): Unit = {
+ if (response.getStatusCode == 200) {
+ promise.success(user)
+ } else {
+ promise.failure(new Exception(response.getResponseBody))
+ }
+ }
+ })
+ promise.future
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
new file mode 100644
index 0000000..c62071c
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2.impl
+
+import com.github.scribejava.apis.google.GoogleJsonTokenExtractor
+import com.github.scribejava.core.builder.api.DefaultApi20
+import com.github.scribejava.core.extractors.TokenExtractor
+import com.github.scribejava.core.model._
+import spray.json._
+
+/**
+ *
+ * Does authentication with Google OAuth2 service. It only extract the email address
+ * from user profile of Google.
+ *
+ * Pre-requisite steps to use this Authenticator:
+ *
+ * Step1: Register your website as an OAuth2 Application on Google
+ * 1) Create an application representing your website at [[https://console.developers.google.com]]
+ *
+ * 2) In "API Manager" of your created application, enable API "Google+ API"
+ *
+ * 3) Create OAuth client ID for this application. In "Credentials" tab of "API Manager",
+ * choose "Create credentials", and then select OAuth client ID. Follow the wizard
+ * to set callback URL, and generate client ID, and client Secret. Callback URL is NOT optional.
+ *
+ * Step2: Configure the OAuth2 information in gear.conf
+ *
+ * 1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled"
+ * as true.
+ *
+ * 2) Configure section "gearpump.ui-security.oauth2-authenticators.google". Please make sure
+ * class name, client ID, client Secret, and callback URL are set properly.
+ *
+ * NOTE: callback URL set here should match what is configured on Google in step1.
+ *
+ * Step3: Restart the UI service and try out the Google social login button in UI.
+ *
+ * NOTE: OAuth requires Internet access, @see
+ * [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] to find
+ * some helpful tutorials
+ *
+ * NOTE: Google use scope to define what data can be fetched by OAuth2. Currently we use profile
+ * [[https://www.googleapis.com/auth/userinfo.email]]. However, Google may change the profile
+ * in future.
+ *
+ * TODO Currently, this doesn't verify the state from Google OAuth2 response.
+ *
+ * See [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] for more
+ * API information.
+ */
+class GoogleOAuth2Authenticator extends BaseOAuth2Authenticator {
+
+ import org.apache.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator._
+
+ protected override def authorizeUrl: String = AuthorizeUrl
+
+ protected override def accessTokenEndpoint: String = AccessEndpoint
+
+ protected override def protectedResourceUrl: String = ResourceUrl
+
+ protected override def scope: String = GoogleOAuth2Authenticator.Scope
+
+ protected override def extractUserName(body: String): String = {
+ val emails = body.parseJson.asJsObject.fields("emails").asInstanceOf[JsArray]
+ val email = emails.elements(0).asJsObject("Cannot find email account")
+ .fields("value").asInstanceOf[JsString].value
+ email
+ }
+
+ override def oauth2Api(): DefaultApi20 = new AsyncGoogleApi20(authorizeUrl, accessTokenEndpoint)
+}
+
+object GoogleOAuth2Authenticator {
+
+ import org.apache.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator._
+
+ // scalastyle:off line.size.limit
+ val AuthorizeUrl = "https://accounts.google.com/o/oauth2/auth?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s"
+ // scalastyle:on line.size.limit
+ val AccessEndpoint = "https://www.googleapis.com/oauth2/v4/token"
+ val ResourceUrl = "https://www.googleapis.com/plus/v1/people/me"
+ val Scope = "https://www.googleapis.com/auth/userinfo.email"
+
+ private class AsyncGoogleApi20(authorizeUrl: String, accessEndpoint: String)
+ extends BaseApi20(authorizeUrl, accessEndpoint) {
+
+ override def getAccessTokenExtractor: TokenExtractor[OAuth2AccessToken] = {
+ GoogleJsonTokenExtractor.instance
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
new file mode 100644
index 0000000..caa3a33
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.util
+
+import upickle.Js
+
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.Graph
+
+object UpickleUtil {
+
+ // For implicit type, we need to add EXPLICIT return type, otherwise, upickle may NOT infer the
+ // reader type automatically.
+ // See issue https://github.com/lihaoyi/upickle-pprint/issues/102
+ implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = {
+ upickle.default.Reader[Graph[Int, String]] {
+ case Js.Obj(verties, edges) =>
+ val vertexList = upickle.default.readJs[List[Int]](verties._2)
+ val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
+ Graph(vertexList, edgeList)
+ }
+ }
+
+ implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] {
+ case Js.Str(str) =>
+ WorkerId.parse(str)
+ }
+
+ implicit val workerIdWriter: upickle.default.Writer[WorkerId] = upickle.default.Writer[WorkerId] {
+ case workerId: WorkerId =>
+ Js.Str(WorkerId.render(workerId))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
deleted file mode 100644
index e67731e..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-class AdminServiceSpec
- extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
-
- override def testConfig: Config = TestUtil.DEFAULT_CONFIG
-
- implicit def actorSystem: ActorSystem = system
-
- it should "shutdown the ActorSystem when receiving terminate" in {
- val route = new AdminService(actorSystem).route
- implicit val customTimeout = RouteTestTimeout(15.seconds)
- (Post(s"/terminate") ~> route) ~> check {
- assert(status.intValue() == 404)
- }
-
- Await.result(actorSystem.whenTerminated, 20.seconds)
-
- // terminate should terminate current actor system
- assert(actorSystem.whenTerminated.isCompleted)
- }
-}