You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:24 UTC

[14/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
new file mode 100644
index 0000000..32c9f08
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import java.io.{File, IOException}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.nio.file.Files
+import java.nio.file.StandardOpenOption.{APPEND, WRITE}
+import scala.collection.JavaConverters._
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
+import akka.http.scaladsl.unmarshalling.Unmarshaller._
+import akka.stream.Materializer
+import com.typesafe.config.Config
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData}
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ReadOption}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, WorkerList}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig, SubmitApplicationResultValue}
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.worker.WorkerSummary
+import org.apache.gearpump.cluster.{ClusterConfig, UserConfig}
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription}
+import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplication}
+import org.apache.gearpump.util.ActorUtil._
+import org.apache.gearpump.util.FileDirective._
+import org.apache.gearpump.util.{Constants, Graph, Util}
+
+/** Manages service for master node */
+class MasterService(val master: ActorRef,
+    val jarStore: JarStoreService, override val system: ActorSystem)
+  extends BasicService {
+
+  import upickle.default.{read, write}
+
+  private val systemConfig = system.settings.config
+  private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
+
+  protected override def doRoute(implicit mat: Materializer) = pathPrefix("master") {
+    pathEnd {
+      get {
+        onComplete(askActor[MasterData](master, GetMasterData)) {
+          case Success(value: MasterData) => complete(write(value))
+          case Failure(ex) => failWith(ex)
+        }
+      }
+    } ~
+    path("applist") {
+      onComplete(askActor[AppMastersData](master, AppMastersDataRequest)) {
+        case Success(value: AppMastersData) =>
+          complete(write(value))
+        case Failure(ex) => failWith(ex)
+      }
+    } ~
+    path("workerlist") {
+      def future: Future[List[WorkerSummary]] = askActor[WorkerList](master, GetAllWorkers)
+        .flatMap { workerList =>
+          val workers = workerList.workers
+          val workerDataList = List.empty[WorkerSummary]
+
+          Future.fold(workers.map { workerId =>
+            askWorker[WorkerData](master, workerId, GetWorkerData(workerId))
+          })(workerDataList) { (workerDataList, workerData) =>
+            workerDataList :+ workerData.workerDescription
+          }
+        }
+      onComplete(future) {
+        case Success(result: List[WorkerSummary]) => complete(write(result))
+        case Failure(ex) => failWith(ex)
+      }
+    } ~
+    path("config") {
+      onComplete(askActor[MasterConfig](master, QueryMasterConfig)) {
+        case Success(value: MasterConfig) =>
+          val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
+          complete(config)
+        case Failure(ex) =>
+          failWith(ex)
+      }
+    } ~
+    path("metrics" / RestPath) { path =>
+      parameters(ParamMagnet(ReadOption.Key ? ReadOption.ReadLatest)) { readOption: String =>
+        val query = QueryHistoryMetrics(path.head.toString, readOption)
+        onComplete(askActor[HistoryMetrics](master, query)) {
+          case Success(value) =>
+            complete(write(value))
+          case Failure(ex) =>
+            failWith(ex)
+        }
+      }
+    } ~
+    path("submitapp") {
+      post {
+        uploadFile { form =>
+          val jar = form.getFile("jar").map(_.file)
+          val configFile = form.getFile("configfile").map(_.file)
+          val configString = form.getValue("configstring").getOrElse("")
+          val executorCount = form.getValue("executorcount").getOrElse("1").toInt
+          val args = form.getValue("args").getOrElse("")
+
+          val mergedConfigFile = mergeConfig(configFile, configString)
+
+          onComplete(Future(
+            MasterService.submitGearApp(jar, executorCount, args, systemConfig, mergedConfigFile)
+          )) {
+            case Success(success) =>
+              val response = MasterService.AppSubmissionResult(success)
+              complete(write(response))
+            case Failure(ex) =>
+              failWith(ex)
+          }
+        }
+      }
+    } ~
+    path("submitstormapp") {
+      post {
+        uploadFile { form =>
+          val jar = form.getFile("jar").map(_.file)
+          val configFile = form.getFile("configfile").map(_.file)
+          val args = form.getValue("args").getOrElse("")
+          onComplete(Future(
+            MasterService.submitStormApp(jar, configFile, args, systemConfig)
+          )) {
+            case Success(success) =>
+              val response = MasterService.AppSubmissionResult(success)
+              complete(write(response))
+            case Failure(ex) =>
+              failWith(ex)
+          }
+        }
+      }
+    } ~
+    path("submitdag") {
+      post {
+        entity(as[String]) { request =>
+          val msg = java.net.URLDecoder.decode(request, "UTF-8")
+          val submitApplicationRequest = read[SubmitApplicationRequest](msg)
+          import submitApplicationRequest.{appName, dag, processors, userconfig}
+          val context = ClientContext(system.settings.config, system, master)
+
+          val graph = dag.mapVertex { processorId =>
+            processors(processorId)
+          }.mapEdge { (node1, edge, node2) =>
+            PartitionerDescription(new PartitionerByClassName(edge))
+          }
+
+          val effectiveConfig = if (userconfig == null) UserConfig.empty else userconfig
+          val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph))
+
+          import upickle.default.write
+          val submitApplicationResultValue = SubmitApplicationResultValue(appId)
+          val jsonData = write(submitApplicationResultValue)
+          complete(jsonData)
+        }
+      }
+    } ~
+    path("uploadjar") {
+      uploadFile { form =>
+        val jar = form.getFile("jar").map(_.file)
+        if (jar.isEmpty) {
+          complete(write(
+            MasterService.Status(success = false, reason = "Jar file not found")))
+        } else {
+          val jarFile = Util.uploadJar(jar.get, jarStore)
+          complete(write(jarFile))
+        }
+      }
+    } ~
+    path("partitioners") {
+      get {
+        complete(write(BuiltinPartitioners(Constants.BUILTIN_PARTITIONERS.map(_.getName))))
+      }
+    }
+  }
+
+  private def mergeConfig(configFile: Option[File], configString: String): Option[File] = {
+    if (configString == null || configString.isEmpty) {
+      configFile
+    } else {
+      configFile match {
+        case Some(file) =>
+          Files.write(file.toPath, ("\n" + configString).getBytes(UTF_8), APPEND)
+          Some(file)
+        case None =>
+          val file = File.createTempFile("\"userfile_configstring_", ".conf")
+          Files.write(file.toPath, configString.getBytes(UTF_8), WRITE)
+          Some(file)
+      }
+    }
+  }
+}
+
+object MasterService {
+
+  case class BuiltinPartitioners(partitioners: Array[String])
+
+  case class AppSubmissionResult(success: Boolean)
+
+  case class Status(success: Boolean, reason: String = null)
+
+  /**
+   * Submits Native Application.
+   */
+  def submitGearApp(
+      jar: Option[File], executorNum: Int, args: String,
+      systemConfig: Config, userConfigFile: Option[File]): Boolean = {
+    submitAndDeleteTempFiles(
+      "org.apache.gearpump.cluster.main.AppSubmitter",
+      argsArray = Array("-executors", executorNum.toString) ++ spaceSeparatedArgumentsToArray(args),
+      fileMap = Map("jar" -> jar).filter(_._2.isDefined).mapValues(_.get),
+      classPath = getUserApplicationClassPath,
+      systemConfig,
+      userConfigFile
+    )
+  }
+
+  /**
+   * Submits Storm application.
+   */
+  def submitStormApp(
+      jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = {
+    submitAndDeleteTempFiles(
+      "org.apache.gearpump.experiments.storm.main.GearpumpStormClient",
+      argsArray = spaceSeparatedArgumentsToArray(args),
+      fileMap = Map("jar" -> jar, "config" -> stormConf).filter(_._2.isDefined).mapValues(_.get),
+      classPath = getStormApplicationClassPath,
+      systemConfig,
+      userConfigFile = None
+    )
+  }
+
+  private def submitAndDeleteTempFiles(
+      mainClass: String, argsArray: Array[String], fileMap: Map[String, File],
+      classPath: Array[String], systemConfig: Config,
+      userConfigFile: Option[File] = None): Boolean = {
+    try {
+      val jar = fileMap.get("jar")
+      if (jar.isEmpty) {
+        throw new IOException("JAR file not supplied")
+      }
+
+      val process = Util.startProcess(
+        clusterOptions(systemConfig, userConfigFile),
+        classPath,
+        mainClass,
+        arguments = createFilePathArgArray(fileMap) ++ argsArray
+      )
+
+      val retval = process.exitValue()
+      if (retval != 0) {
+        throw new IOException(s"Process exit abnormally with exit code $retval.\n" +
+          s"Error message: ${process.logger.error}")
+      }
+      true
+    } finally {
+      fileMap.values.foreach(_.delete)
+      if (userConfigFile.isDefined) {
+        userConfigFile.get.delete()
+      }
+    }
+  }
+
+  /**
+   * Returns Java options for gearpump cluster
+   */
+  private def clusterOptions(systemConfig: Config, userConfigFile: Option[File]): Array[String] = {
+    var options = Array(
+      s"-D${Constants.GEARPUMP_HOME}=${systemConfig.getString(Constants.GEARPUMP_HOME)}",
+      s"-D${Constants.GEARPUMP_HOSTNAME}=${systemConfig.getString(Constants.GEARPUMP_HOSTNAME)}",
+      s"-D${Constants.PREFER_IPV4}=true"
+    )
+
+    val masters = systemConfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+      .toList.flatMap(Util.parseHostList)
+    options ++= masters.zipWithIndex.map { case (master, index) =>
+      s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.$index=${master.host}:${master.port}"
+    }.toArray[String]
+
+    if (userConfigFile.isDefined) {
+      options :+= s"-D${Constants.GEARPUMP_CUSTOM_CONFIG_FILE}=${userConfigFile.get.getPath}"
+    }
+    options
+  }
+
+  /**
+   * Filter all defined file paths and store their config key and path into an array.
+   */
+  private def createFilePathArgArray(fileMap: Map[String, File]): Array[String] = {
+    var args = Array.empty[String]
+    fileMap.foreach({ case (key, path) =>
+      args ++= Array(s"-$key", path.getPath)
+    })
+    args
+  }
+
+  /**
+   * Returns a space separated arguments as an array.
+   */
+  private def spaceSeparatedArgumentsToArray(str: String): Array[String] = {
+    str.split(" +").filter(_.nonEmpty)
+  }
+
+  private val homeDir = System.getProperty(Constants.GEARPUMP_HOME) + "/"
+  private val libHomeDir = homeDir + "lib/"
+
+  private def getUserApplicationClassPath: Array[String] = {
+    Array(
+      homeDir + "conf",
+      libHomeDir + "daemon/*",
+      libHomeDir + "yarn/*",
+      libHomeDir + "*"
+    )
+  }
+
+  private def getStormApplicationClassPath: Array[String] = {
+    getUserApplicationClassPath ++ Array(
+      libHomeDir + "storm/*"
+    )
+  }
+
+  case class SubmitApplicationRequest(
+      appName: String,
+      processors: Map[ProcessorId, ProcessorDescription],
+      dag: Graph[Int, String],
+      userconfig: UserConfig)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
new file mode 100644
index 0000000..87f9e34
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/RestServices.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.http.scaladsl.model.StatusCodes._
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.{Route, _}
+import akka.stream.ActorMaterializer
+import akka.util.Timeout
+import org.apache.commons.lang.exception.ExceptionUtils
+
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.util.{Constants, LogUtil}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/** Contains all REST API service endpoints */
+class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem)
+  extends RouteService {
+
+  implicit val timeout = Constants.FUTURE_TIMEOUT
+
+  private val config = system.settings.config
+
+  private val jarStoreService = JarStoreService.get(config)
+  jarStoreService.init(config, system)
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  private val securityEnabled = config.getBoolean(
+    Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED)
+
+  private val supervisorPath = system.settings.config.getString(
+    Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
+
+  private val myExceptionHandler: ExceptionHandler = ExceptionHandler {
+    case ex: Throwable => {
+      extractUri { uri =>
+        LOG.error(s"Request to $uri could not be handled normally", ex)
+        complete(InternalServerError, ExceptionUtils.getStackTrace(ex))
+      }
+    }
+  }
+
+  // Makes sure staticRoute is the final one, as it will try to lookup resource in local path
+  // if there is no match in previous routes
+  private val static = new StaticService(system, supervisorPath).route
+
+  def supervisor: ActorRef = {
+    if (supervisorPath == null || supervisorPath.isEmpty()) {
+      null
+    } else {
+      val actorRef = system.actorSelection(supervisorPath).resolveOne()
+      Await.result(actorRef, new Timeout(Duration.create(5, "seconds")).duration)
+    }
+  }
+
+  override def route: Route = {
+    if (securityEnabled) {
+      val security = new SecurityService(services, system)
+      handleExceptions(myExceptionHandler) {
+        security.route ~ static
+      }
+    } else {
+      handleExceptions(myExceptionHandler) {
+        services.route ~ static
+      }
+    }
+  }
+
+  private def services: RouteService = {
+
+    val admin = new AdminService(system)
+    val masterService = new MasterService(master, jarStoreService, system)
+    val worker = new WorkerService(master, system)
+    val app = new AppMasterService(master, jarStoreService, system)
+    val sup = new SupervisorService(master, supervisor, system)
+
+    new RouteService {
+      override def route: Route = {
+        admin.route ~ sup.route ~ masterService.route ~ worker.route ~ app.route
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
new file mode 100644
index 0000000..804b34f
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SecurityService.scala
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.headers.{HttpChallenge, HttpCookie, HttpCookiePair}
+import akka.http.scaladsl.model.{RemoteAddress, StatusCodes, Uri}
+import akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsMissing, CredentialsRejected}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._
+import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet
+import akka.stream.Materializer
+import com.typesafe.config.Config
+import com.softwaremill.session.SessionDirectives._
+import com.softwaremill.session.SessionOptions._
+import com.softwaremill.session.{MultiValueSessionSerializer, SessionConfig, SessionManager}
+import upickle.default.write
+
+import org.apache.gearpump.security.{Authenticator => BaseAuthenticator}
+import org.apache.gearpump.services.SecurityService.{User, UserSession}
+import org.apache.gearpump.services.security.oauth2.OAuth2Authenticator
+import org.apache.gearpump.util.{Constants, LogUtil}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/**
+ * Security authentication endpoint.
+ *
+ * - When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection
+ * - When user can be authenticated, but are not authorized to access certail resource, will
+ *   return a 405 AuthorizationFailedRejection.
+ * - When web UI frontend receive 401, it should redirect the UI to login page.
+ * - When web UI receive 405,it should display errors like
+ *   "current user is not authorized to access this resource."
+ *
+ * The Authenticator used is pluggable, the current Authenticator is resolved by looking up
+ * config path [[org.apache.gearpump.util.Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]].
+ *
+ * See [[org.apache.gearpump.security.Authenticator]] to find more info on custom Authenticator.
+ */
+class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService {
+
+  // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box.
+  private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump",
+    params = Map.empty)
+
+  val LOG = LogUtil.getLogger(getClass, "AUDIT")
+
+  private val config = system.settings.config
+  private val sessionConfig = SessionConfig.fromConfig(config)
+  private implicit val sessionManager: SessionManager[UserSession] =
+    new SessionManager[UserSession](sessionConfig)
+
+  private val authenticator = {
+    val clazz = Class.forName(config.getString(Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS))
+    val constructor = clazz.getConstructor(classOf[Config])
+    val authenticator = constructor.newInstance(config).asInstanceOf[BaseAuthenticator]
+    authenticator
+  }
+
+  private def configToMap(config: Config, path: String) = {
+    import scala.collection.JavaConverters._
+    config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
+  }
+
+  private val oauth2Providers: Map[String, String] = {
+    if (config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)) {
+      val map = configToMap(config, Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS)
+      map.keys.toList.map { key =>
+        val iconPath = config.getString(s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$key.icon")
+        (key, iconPath)
+      }.toMap
+    } else {
+      Map.empty[String, String]
+    }
+  }
+
+  private def authenticate(user: String, pass: String)(implicit ec: ExecutionContext)
+  : Future[Option[UserSession]] = {
+    authenticator.authenticate(user, pass, ec).map { result =>
+      if (result.authenticated) {
+        Some(UserSession(user, result.permissionLevel))
+      } else {
+        None
+      }
+    }
+  }
+
+  private def rejectMissingCredentials: Route = {
+    reject(AuthenticationFailedRejection(CredentialsMissing, challenge))
+  }
+
+  private def rejectWrongCredentials: Route = {
+    reject(AuthenticationFailedRejection(CredentialsRejected, challenge))
+  }
+
+  private def requireAuthentication(inner: UserSession => Route): Route = {
+    optionalSession(oneOff, usingCookiesOrHeaders) { sessionOption =>
+      sessionOption match {
+        case Some(session) => {
+          inner(session)
+        }
+        case None =>
+          rejectMissingCredentials
+      }
+    }
+  }
+
+  private def login(session: UserSession, ip: String, redirectToRoot: Boolean = false): Route = {
+    setSession(oneOff, usingCookies, session) {
+      val user = session.user
+      // Default: 1 day
+      val maxAgeMs = 1000 * sessionConfig.sessionMaxAgeSeconds.getOrElse(24 * 3600L)
+      setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = Some("/"),
+        maxAge = Some(maxAgeMs))) {
+        LOG.info(s"user $user login from $ip")
+        if (redirectToRoot) {
+          redirect(Uri("/"), StatusCodes.TemporaryRedirect)
+        } else {
+          complete(write(new User(user)))
+        }
+      }
+    }
+  }
+
+  private def logout(user: UserSession, ip: String): Route = {
+    invalidateSession(oneOff, usingCookies) { ctx =>
+      LOG.info(s"user ${user.user} logout from $ip")
+      ctx.complete(write(new User(user.user)))
+    }
+  }
+
+  // Only admin are able to access operation like post/delete/put
+  private def requireAuthorization(user: UserSession, route: => Route): Route = {
+    // Valid user
+    if (user.permissionLevel >= BaseAuthenticator.User.permissionLevel) {
+      route
+    } else {
+      // Possibly a guest or not authenticated.
+      (put | delete | post) {
+        // Reject with 405 authorization error
+        reject(AuthorizationFailedRejection)
+      } ~
+      get {
+        route
+      }
+    }
+  }
+
+  private val unknownIp: Directive1[RemoteAddress] = {
+    Directive[Tuple1[RemoteAddress]]{ inner =>
+      inner(new Tuple1(RemoteAddress.Unknown))
+    }
+  }
+
+  override val route: Route = {
+
+    extractExecutionContext{implicit ec: ExecutionContext =>
+    extractMaterializer{implicit mat: Materializer =>
+    (extractClientIP | unknownIp) { ip =>
+      pathPrefix("login") {
+        pathEndOrSingleSlash {
+          get {
+            getFromResource("login/login.html")
+          } ~
+          post {
+            // Guest account don't have permission to submit new application in UI
+            formField(FieldMagnet('username.as[String])) {user: String =>
+              formFields(FieldMagnet('password.as[String])) {pass: String =>
+                val result = authenticate(user, pass)
+                onSuccess(result) {
+                  case Some(session) =>
+                    login(session, ip.toString)
+                  case None =>
+                    rejectWrongCredentials
+                }
+              }
+            }
+          }
+        } ~
+        path ("oauth2" / "providers") {
+          // Responds with a list of OAuth2 providers.
+          complete(write(oauth2Providers))
+        } ~
+        // Support OAUTH Authentication
+        pathPrefix ("oauth2"/ Segment) {providerName =>
+        // Resolve OAUTH Authentication Provider
+        val oauthService = OAuth2Authenticator.get(config, providerName, ec)
+
+          if (oauthService == null) {
+            // OAuth2 is disabled.
+            complete(StatusCodes.NotFound)
+          } else {
+
+            def loginWithOAuth2Parameters(parameters: Map[String, String]): Route = {
+              val result = oauthService.authenticate(parameters)
+              onComplete(result) {
+                case Success(session) =>
+                  login(session, ip.toString, redirectToRoot = true)
+                case Failure(ex) => {
+                  LOG.info(s"Failed to login user from ${ip.toString}", ex)
+                  rejectWrongCredentials
+                }
+              }
+            }
+
+            path ("authorize") {
+              // Redirects to OAuth2 service provider for authorization.
+              redirect(Uri(oauthService.getAuthorizationUrl), StatusCodes.TemporaryRedirect)
+            } ~
+            path ("accesstoken") {
+              post {
+                // Guest account don't have permission to submit new application in UI
+                formField(FieldMagnet('accesstoken.as[String])) {accesstoken: String =>
+                  loginWithOAuth2Parameters(Map("accesstoken" -> accesstoken))
+                }
+              }
+            } ~
+            path("callback") {
+              // Login with authorization code or access token.
+              parameterMap {parameters =>
+                loginWithOAuth2Parameters(parameters)
+              }
+            }
+          }
+        }
+      } ~
+      path("logout") {
+        post {
+          requireAuthentication {session =>
+            logout(session, ip.toString())
+          }
+        }
+      } ~
+      requireAuthentication {user =>
+        requireAuthorization(user, inner.route)
+      }
+    }}}
+
+  }
+}
+
+object SecurityService {
+
+  val SESSION_MANAGER_KEY = "akka.http.session.server-secret"
+
+  case class UserSession(user: String, permissionLevel: Int)
+
+  object UserSession {
+
+    private val User = "user"
+    private val PermissionLevel = "permissionLevel"
+
+    implicit def serializer: MultiValueSessionSerializer[UserSession] = {
+      new MultiValueSessionSerializer[UserSession](
+        toMap = {t: UserSession =>
+          Map(User -> t.user, PermissionLevel -> t.permissionLevel.toString)
+        },
+        fromMap = {m: Map[String, String] =>
+          if (m.contains(User)) {
+            Try(UserSession(m(User), m(PermissionLevel).toInt))
+          } else {
+            Failure[UserSession](new Exception("Fail to parse session "))
+          }
+        }
+      )
+    }
+  }
+
+  case class User(user: String)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
new file mode 100644
index 0000000..284d3f2
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.stream.Materializer
+
+import org.apache.gearpump.util.Util
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/**
+ * static resource files.
+ */
+class StaticService(override val system: ActorSystem, supervisorPath: String)
+  extends BasicService {
+
+  private val version = Util.version
+
+  protected override def prefix = Neutral
+
+  override def cache: Boolean = true
+
+  protected override def doRoute(implicit mat: Materializer) = {
+    path("version") {
+      get { ctx =>
+        ctx.complete(version)
+      }
+    } ~
+    // For YARN usage, we need to make sure supervisor-path
+    // can be accessed without authentication.
+    path("supervisor-actor-path") {
+      get {
+        complete(supervisorPath)
+      }
+    } ~
+    pathEndOrSingleSlash {
+      getFromResource("index.html")
+    } ~
+    path("favicon.ico") {
+      complete(StatusCodes.NotFound)
+    } ~
+    pathPrefix("webjars") {
+      get {
+        getFromResourceDirectory("META-INF/resources/webjars")
+      }
+    } ~
+    path(Rest) { path =>
+      getFromResource("%s" format path)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala
new file mode 100644
index 0000000..fecf5ad
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/SupervisorService.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import akka.stream.Materializer
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.services.SupervisorService.{Path, Status}
+import org.apache.gearpump.util.ActorUtil._
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/** Responsible for adding/removing machines. Typically it delegates to YARN. */
+class SupervisorService(
+    val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem)
+  extends BasicService {
+
+  import upickle.default.write
+
+  /**
+   * TODO: Add additional check to ensure the user have enough authorization to
+   * add/remove a worker machine
+   */
+  private def authorize(internal: Route): Route = {
+    if (supervisor == null) {
+      failWith(new Exception("API not enabled, cannot find a valid supervisor! " +
+        "Please make sure Gearpump is running on top of YARN or other resource managers"))
+    } else {
+      internal
+    }
+  }
+
+  protected override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") {
+    pathEnd {
+      get {
+        val path = if (supervisor == null) {
+          null
+        } else {
+          supervisor.path.toString
+        }
+        complete(write(Path(path)))
+      }
+    } ~
+    path("status") {
+      post {
+        if (supervisor == null) {
+          complete(write(Status(enabled = false)))
+        } else {
+          complete(write(Status(enabled = true)))
+        }
+      }
+    } ~
+    path("addworker" / IntNumber) { workerCount =>
+      post {
+        authorize {
+          onComplete(askActor[CommandResult](supervisor, AddWorker(workerCount))) {
+            case Success(value) =>
+              complete(write(value))
+            case Failure(ex) =>
+              failWith(ex)
+          }
+        }
+      }
+    } ~
+    path("removeworker" / Segment) { workerIdString =>
+      post {
+        authorize {
+          val workerId = WorkerId.parse(workerIdString)
+          def future(): Future[CommandResult] = {
+            askWorker[WorkerData](master, workerId, GetWorkerData(workerId)).flatMap{workerData =>
+              val containerId = workerData.workerDescription.resourceManagerContainerId
+              askActor[CommandResult](supervisor, RemoveWorker(containerId))
+            }
+          }
+
+          onComplete[CommandResult](future()) {
+            case Success(value) =>
+              complete(write(value))
+            case Failure(ex) =>
+              failWith(ex)
+          }
+        }
+      }
+    }
+  }
+}
+
+object SupervisorService {
+  case class Status(enabled: Boolean)
+
+  case class Path(path: String)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
new file mode 100644
index 0000000..8268d61
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/WorkerService.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.util.{Failure, Success}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.http.scaladsl.server.Directives._
+import akka.stream.Materializer
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ReadOption}
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig}
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.ActorUtil._
+import org.apache.gearpump.util.Constants
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/** Service to handle worker related queries */
+class WorkerService(val master: ActorRef, override val system: ActorSystem)
+  extends BasicService {
+
+  import upickle.default.write
+  private val systemConfig = system.settings.config
+  private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
+
+  protected override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) {
+    workerIdString => {
+      pathEnd {
+        val workerId = WorkerId.parse(workerIdString)
+        onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) {
+          case Success(value: WorkerData) =>
+            complete(write(value.workerDescription))
+          case Failure(ex) => failWith(ex)
+        }
+      }
+    }~
+    path("config") {
+      val workerId = WorkerId.parse(workerIdString)
+      onComplete(askWorker[WorkerConfig](master, workerId, QueryWorkerConfig(workerId))) {
+        case Success(value: WorkerConfig) =>
+          val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
+          complete(config)
+        case Failure(ex) =>
+          failWith(ex)
+      }
+    } ~
+    path("metrics" / RestPath ) { path =>
+      val workerId = WorkerId.parse(workerIdString)
+      parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
+        val query = QueryHistoryMetrics(path.head.toString, readOption)
+        onComplete(askWorker[HistoryMetrics](master, workerId, query)) {
+          case Success(value) =>
+            complete(write(value))
+          case Failure(ex) =>
+            failWith(ex)
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala
new file mode 100644
index 0000000..23c95c4
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/main/Services.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.main
+
+import java.util.Random
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.server.Route
+import akka.stream.ActorMaterializer
+import com.typesafe.config.ConfigValueFactory
+import org.slf4j.Logger
+import sun.misc.BASE64Encoder
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, Gear}
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.services.{RestServices, SecurityService}
+import org.apache.gearpump.util.LogUtil.ProcessType
+import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
+
+/** Command line to start UI server */
+object Services extends AkkaApp with ArgumentsParser {
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "master" -> CLIOption("<host:port>", required = false),
+    Gear.OPTION_CONFIG -> CLIOption("<provide a custom configuration file>", required = false),
+    "supervisor" -> CLIOption("<Supervisor Actor Path>", required = false, Some("")))
+
+  override val description = "UI Server"
+
+  override def akkaConfig: Config = {
+    ClusterConfig.ui()
+  }
+
+  override def help(): Unit = {
+    // scalastyle:off println
+    Console.println("UI Server")
+    // scalastyle:on println
+  }
+
+  private var killFunction: Option[() => Unit] = None
+
+  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+
+    val argConfig = parse(args)
+    var akkaConf =
+      if (argConfig.exists(Gear.OPTION_CONFIG)) {
+        ClusterConfig.ui(argConfig.getString(Gear.OPTION_CONFIG))
+      } else {
+        inputAkkaConf
+      }
+
+    val LOG: Logger = {
+      LogUtil.loadConfiguration(akkaConf, ProcessType.UI)
+      LogUtil.getLogger(getClass)
+    }
+
+    if (argConfig.exists("master")) {
+      val master = argConfig.getString("master")
+      akkaConf = akkaConf.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
+        ConfigValueFactory.fromIterable(List(master).asJava))
+    }
+
+    akkaConf = akkaConf.withValue(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH,
+      ConfigValueFactory.fromAnyRef(argConfig.getString("supervisor")))
+      // Creates a random unique secret key for session manager.
+      // All previous stored session token cookies will be invalidated when UI
+      // server is restarted.
+      .withValue(SecurityService.SESSION_MANAGER_KEY,
+        ConfigValueFactory.fromAnyRef(randomSeverSecret()))
+
+    val masterCluster = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+      .flatMap(Util.parseHostList)
+
+    implicit val system = ActorSystem("services", akkaConf)
+    implicit val executionContext = system.dispatcher
+
+    import scala.concurrent.duration._
+    val master = system.actorOf(MasterProxy.props(masterCluster, 1.day),
+      s"masterproxy${system.name}")
+    val (host, port) = parseHostPort(system.settings.config)
+
+    implicit val mat = ActorMaterializer()
+    val services = new RestServices(master, mat, system)
+
+    val bindFuture = Http().bindAndHandle(Route.handlerFlow(services.route), host, port)
+    Await.result(bindFuture, 15.seconds)
+
+    val displayHost = if (host == "0.0.0.0") "127.0.0.1" else host
+    LOG.info(s"Please browse to http://$displayHost:$port to see the web UI")
+
+    // scalastyle:off println
+    println(s"Please browse to http://$displayHost:$port to see the web UI")
+    // scalastyle:on println
+
+    killFunction = Some { () =>
+      LOG.info("Shutting down UI Server")
+      system.terminate()
+    }
+
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  private def randomSeverSecret(): String = {
+    val random = new Random()
+    val length = 64 // Required
+    val bytes = new Array[Byte](length)
+    random.nextBytes(bytes)
+    val endecoder = new BASE64Encoder()
+    endecoder.encode(bytes)
+  }
+
+  private def parseHostPort(config: Config): (String, Int) = {
+    val port = config.getInt(Constants.GEARPUMP_SERVICE_HTTP)
+    val host = config.getString(Constants.GEARPUMP_SERVICE_HOST)
+    (host, port)
+  }
+
+  // TODO: fix this
+  // Hacks around for YARN module, so that we can kill the UI server
+  // when application is shutting down.
+  def kill(): Unit = {
+    if (killFunction.isDefined) {
+      killFunction.get.apply()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala
new file mode 100644
index 0000000..12dec1d
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/package.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump
+
+package object services {
+  final val REST_VERSION = "v1.0"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala
new file mode 100644
index 0000000..e9008aa
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/OAuth2Authenticator.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.services.SecurityService.UserSession
+import org.apache.gearpump.util.Constants
+import org.apache.gearpump.util.Constants._
+
+/**
+ *
+ * Uses OAuth2 social-login as the mechanism for authentication.
+ * @see [[https://tools.ietf.org/html/rfc6749]] to find what is OAuth2, and how it works.
+ *
+ * Basically flow for OAuth2 Authentication:
+ *  1. User accesses Gearpump UI website, and choose to login with OAuth2 server.
+ *  2. Gearpump UI website redirects user to OAuth2 server domain authorization endpoint.
+ *  3. End user complete the authorization in the domain of OAuth2 server.
+ *  4. OAuth2 server redirects user back to Gearpump UI server.
+ *  5. Gearpump UI server verify the tokens and extract credentials from query
+ *    parameters and form fields.
+ *
+ * NOTE: '''Thread-safety''' is a MUST requirement. Developer need to ensure the sub-class is
+ *      thread-safe. Sub-class should have a parameterless constructor.
+ *
+ * NOTE:  OAuth2 Authenticator requires access of Internet. Please make sure HTTP proxy are
+ * set properly if applied.
+ *
+ * Example: Config proxy when UI server is started on Windows:
+ * {{{
+ *   > set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com
+ *      -Dhttps.proxyPort=8088
+ *   > bin\services
+ * }}}
+ *
+ * Example: Config proxy when UI server is started on Linux:
+ * {{{
+ *   $ export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com
+ *      -Dhttps.proxyPort=8088"
+ *   $ bin/services
+ * }}}
+ */
+trait OAuth2Authenticator {
+
+  /**
+   * Inits authenticator with config which contains client ID, client secret, and etc..
+   *
+   * Typically, the client key and client secret is provided by OAuth2 Authorization server
+   * when user register an application there.
+   * See [[https://tools.ietf.org/html/rfc6749]] for definition of client, client Id,
+   * and client secret.
+   *
+   * See [[https://developer.github.com/v3/oauth/]] for an actual example of how Github
+   * use client key, and client secret.
+   *
+   * NOTE:  '''Thread-Safety''': Framework ensures this call is synchronized.
+   *
+   * @param config Client Id, client secret, callback URL and etc..
+   * @param executionContext ExecutionContext from hosting environment.
+   */
+  def init(config: Config, executionContext: ExecutionContext): Unit
+
+  /**
+   * Returns the OAuth Authorization URL so for redirection to that address to do OAuth2
+   * authorization.
+   *
+   * NOTE:  '''Thread-Safety''': This can be called in a multi-thread environment. Developer
+   *      need to ensure thread safety.
+   */
+  def getAuthorizationUrl: String
+
+  /**
+   * After authorization, OAuth2 server redirects user back with tokens. This verify the
+   * tokens, retrieve the profiles, and return [[UserSession]] information.
+   *
+   * NOTE:  This is an Async call.
+   *
+   * NOTE:  This call requires external internet access.
+   *
+   * NOTE:  '''Thread-Safety''': This can be called in a multi-thread environment. Developer
+   *      need to ensure thread safety.
+   *
+   * @param parameters HTTP Query and Post parameters, which typically contains Authorization code.
+   * @return UserSession if authentication pass.
+   */
+  def authenticate(parameters: Map[String, String]): Future[UserSession]
+
+  /**
+   * Clean up resource
+   */
+  def close(): Unit
+}
+
+object OAuth2Authenticator {
+
+  // Serves as a quick immutable lookup cache
+  private var providers = Map.empty[String, OAuth2Authenticator]
+
+  /**
+   * Load Authenticator from [[Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS]]
+   *
+   * @param provider, Name for the OAuth2 Authentication Service.
+   * @return Returns null if the OAuth2 Authentication is disabled.
+   */
+  def get(config: Config, provider: String, executionContext: ExecutionContext)
+    : OAuth2Authenticator = {
+
+    if (providers.contains(provider)) {
+      providers(provider)
+    } else {
+      val path = s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$provider"
+      val enabled = config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)
+      if (enabled && config.hasPath(path)) {
+        this.synchronized {
+          if (providers.contains(provider)) {
+            providers(provider)
+          } else {
+            val authenticatorConfig = config.getConfig(path)
+            val authenticatorClass = authenticatorConfig.getString(
+              GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
+            val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass)
+            val authenticator = clazz.newInstance().asInstanceOf[OAuth2Authenticator]
+            authenticator.init(authenticatorConfig, executionContext)
+            providers += provider -> authenticator
+            authenticator
+          }
+        }
+      } else {
+        null
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
new file mode 100644
index 0000000..603fb1d
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2.impl
+
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.mutable.StringBuilder
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
+
+import com.typesafe.config.Config
+import com.github.scribejava.core.builder.ServiceBuilderAsync
+import com.github.scribejava.core.builder.api.DefaultApi20
+import com.github.scribejava.core.model._
+import com.github.scribejava.core.oauth.OAuth20Service
+import com.github.scribejava.core.utils.OAuthEncoder
+import com.ning.http.client.AsyncHttpClientConfig
+
+import org.apache.gearpump.security.Authenticator
+import org.apache.gearpump.services.SecurityService.UserSession
+import org.apache.gearpump.services.security.oauth2.OAuth2Authenticator
+import org.apache.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.Util
+
+/**
+ * Uses Ning AsyncClient to connect to OAuth2 service.
+ *
+ * See [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]]
+ * for more API information.
+ */
+abstract class BaseOAuth2Authenticator extends OAuth2Authenticator {
+
+  // Authorize Url for end user to authorize
+  protected def authorizeUrl: String
+
+  // Used to fetch the Access Token.
+  protected def accessTokenEndpoint: String
+
+  // Protected resource Url to get the user profile
+  protected def protectedResourceUrl: String
+
+  // Extracts the username information from response of protectedResourceUrl
+  protected def extractUserName(body: String): String
+
+  // Scope required to access protectedResourceUrl
+  protected def scope: String
+
+  // OAuth2 endpoint definition for ScribeJava.
+  protected def oauth2Api(): DefaultApi20 = {
+    new BaseApi20(authorizeUrl, accessTokenEndpoint)
+  }
+
+  protected var oauthService: OAuth20Service = null
+
+  protected var executionContext: ExecutionContext = null
+
+  private var defaultPermissionLevel = Authenticator.Guest.permissionLevel
+
+  // Synchronization ensured by the caller
+  override def init(config: Config, executionContext: ExecutionContext): Unit = {
+    if (this.oauthService == null) {
+      val callback = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK)
+      val clientId = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID)
+      val clientSecret = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET)
+      defaultPermissionLevel = {
+        val role = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE)
+        role match {
+          case "guest" => Authenticator.Guest.permissionLevel
+          case "user" => Authenticator.User.permissionLevel
+          case "admin" => Authenticator.Admin.permissionLevel
+          case _ => Authenticator.UnAuthenticated.permissionLevel
+        }
+      }
+      this.oauthService = buildOAuth2Service(clientId, clientSecret, callback)
+      this.executionContext = executionContext
+    }
+  }
+
+  private val isClosed: AtomicBoolean = new AtomicBoolean(false)
+
+  override def close(): Unit = {
+    if (isClosed.compareAndSet(false, true)) {
+      if (null != oauthService && null != oauthService.getAsyncHttpClient()) {
+        oauthService.getAsyncHttpClient().close()
+      }
+    }
+  }
+
+  override def getAuthorizationUrl(): String = {
+    oauthService.getAuthorizationUrl()
+  }
+
+  protected def authenticateWithAccessToken(accessToken: OAuth2AccessToken): Future[UserSession] = {
+    val promise = Promise[UserSession]()
+    val request = new OAuthRequestAsync(Verb.GET, protectedResourceUrl, oauthService)
+    oauthService.signRequest(accessToken, request)
+    request.sendAsync {
+      new OAuthAsyncRequestCallback[Response] {
+        override def onCompleted(response: Response): Unit = {
+          try {
+            val user = extractUserName(response.getBody)
+            promise.success(new UserSession(user, defaultPermissionLevel))
+          } catch {
+            case ex: Throwable =>
+              promise.failure(ex)
+          }
+        }
+
+        override def onThrowable(throwable: Throwable): Unit = {
+          promise.failure(throwable)
+        }
+      }
+    }
+    promise.future
+  }
+
+  protected def authenticateWithAuthorizationCode(code: String): Future[UserSession] = {
+
+    implicit val ec: ExecutionContext = executionContext
+
+    val promise = Promise[UserSession]()
+    oauthService.getAccessTokenAsync(code,
+
+      new OAuthAsyncRequestCallback[OAuth2AccessToken] {
+        override def onCompleted(accessToken: OAuth2AccessToken): Unit = {
+          authenticateWithAccessToken(accessToken).onComplete {
+            case Success(user) => promise.success(user)
+            case Failure(ex) => promise.failure(ex)
+          }
+        }
+
+        override def onThrowable(throwable: Throwable): Unit = {
+          promise.failure(throwable)
+        }
+      })
+    promise.future
+  }
+
+  override def authenticate(parameters: Map[String, String]): Future[UserSession] = {
+
+    val code = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE)
+    val accessToken = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN)
+
+    if (accessToken.isDefined) {
+      authenticateWithAccessToken(new OAuth2AccessToken(accessToken.get))
+    } else if (code.isDefined) {
+      authenticateWithAuthorizationCode(code.get)
+    } else {
+      // Fails authentication if code not exist
+      Future.failed(new Exception("Fail to authenticate user as there is no code parameter in URL"))
+    }
+  }
+
+  private def buildOAuth2Service(clientId: String, clientSecret: String, callback: String)
+    : OAuth20Service = {
+    val state: String = "state" + Util.randInt()
+    ScribeJavaConfig.setForceTypeOfHttpRequests(
+      ForceTypeOfHttpRequest.FORCE_ASYNC_ONLY_HTTP_REQUESTS)
+    val clientConfig: AsyncHttpClientConfig = new AsyncHttpClientConfig.Builder()
+      .setMaxConnections(5)
+      .setUseProxyProperties(true)
+      .setRequestTimeout(60000)
+      .setAllowPoolingConnections(false)
+      .setPooledConnectionIdleTimeout(60000)
+      .setReadTimeout(60000).build
+
+    val service: OAuth20Service = new ServiceBuilderAsync()
+      .apiKey(clientId)
+      .apiSecret(clientSecret)
+      .scope(scope)
+      .state(state)
+      .callback(callback)
+      .asyncHttpClientConfig(clientConfig)
+      .build(oauth2Api())
+
+    service
+  }
+}
+
+object BaseOAuth2Authenticator {
+
+  class BaseApi20(authorizeUrl: String, accessTokenEndpoint: String) extends DefaultApi20 {
+    def getAccessTokenEndpoint: String = {
+      accessTokenEndpoint
+    }
+
+    def getAuthorizationUrl(config: OAuthConfig): String = {
+      val sb: StringBuilder = new StringBuilder(String.format(authorizeUrl,
+        config.getResponseType, config.getApiKey, OAuthEncoder.encode(config.getCallback),
+        OAuthEncoder.encode(config.getScope)))
+      val state: String = config.getState
+      if (state != null) {
+        sb.append('&').append(OAuthConstants.STATE).append('=').append(OAuthEncoder.encode(state))
+      }
+      sb.toString
+    }
+
+    override def createService(config: OAuthConfig): OAuth20Service = {
+      new OAuth20Service(this, config) {
+
+        protected override def createAccessTokenRequest[T <: AbstractRequest](
+            code: String, request: T): T = {
+          super.createAccessTokenRequest(code, request)
+
+          if (!getConfig.hasGrantType) {
+            request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE)
+          }
+
+          // Work-around for issue https://github.com/scribejava/scribejava/issues/641
+          request.addHeader("Content-Type", "application/x-www-form-urlencoded")
+          request
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
new file mode 100644
index 0000000..ded50a7
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2.impl
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+import com.typesafe.config.Config
+import com.github.scribejava.core.builder.api.DefaultApi20
+import com.github.scribejava.core.model._
+import com.github.scribejava.core.oauth.OAuth20Service
+import com.ning.http.client
+import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient}
+import spray.json.{JsString, _}
+import sun.misc.BASE64Encoder
+
+import org.apache.gearpump.services.SecurityService.UserSession
+import org.apache.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20
+import org.apache.gearpump.util.Constants._
+
+/**
+ *
+ * Does authentication with CloudFoundry UAA service. Currently it only
+ * extract the email address of end user.
+ *
+ * For what is UAA,
+ * See [[https://github.com/cloudfoundry/uaa for information about CloudFoundry UAA]]
+ *      (User Account and Authentication Service)
+ *
+ * Pre-requisite steps to use this Authenticator:
+ *
+ * Step1: Register your website to UAA with tool uaac.
+ *  1) Check tutorial on uaac at
+ *    [[https://docs.cloudfoundry.org/adminguide/uaa-user-management.html]]
+ *
+ *  2) Open a bash shell, set the UAA server by command `uaac target`
+ *    {{{
+ *    uaac target [your uaa server url]
+ *    }}}
+ *
+ * NOTE: [your uaa server url] should match the uaahost settings in gear.conf
+ *
+ *  3) Login in as user admin by
+ *     {{{
+ *     uaac token client get admin -s MyAdminPassword
+ *     }}}
+ *
+ *  4) Create a new Application (Client) in UAA,
+ * {{{
+ *   uaac client add [your_client_id]
+ *     --scope "openid cloud_controller.read"
+ *     --authorized_grant_types "authorization_code client_credentials refresh_token"
+ *     --authorities "openid cloud_controller.read"
+ *     --redirect_uri [your_redirect_url]
+ *     --autoapprove true
+ *     --secret [your_client_secret]
+ * }}}
+ *
+ * Step2: Configure the OAuth2 information in gear.conf
+ *
+ *  1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled"
+ * as true.
+ *
+ *  2) Navigate to section "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa"
+ *
+ *  3) Config gear.conf "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" section.
+ * Please make sure class name, client ID, client Secret, and callback URL are set properly.
+ *
+ * NOTE:  The callback URL here should match what you set on CloudFoundry UAA in step1.
+ *
+ * Step3: Restart the UI service and try the "social login" button for UAA.
+ *
+ * NOTE:  OAuth requires Internet access, @see
+ *       [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] to find tutorials to
+ *       configure Internet proxy.
+ *
+ * See [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] for more background
+ *     information of OAuth2.
+ */
+class CloudFoundryUAAOAuth2Authenticator extends BaseOAuth2Authenticator {
+
+  import org.apache.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator._
+
+  private var host: String = null
+
+  protected override def authorizeUrl: String =
+    s"$host/oauth/authorize?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s"
+
+  protected override def accessTokenEndpoint: String = s"$host/oauth/token"
+
+  protected override def protectedResourceUrl: String = s"$host/userinfo"
+
+  protected override def scope: String = "openid,cloud_controller.read"
+
+  private var additionalAuthenticator: Option[AdditionalAuthenticator] = None
+
+  override def init(config: Config, executionContext: ExecutionContext): Unit = {
+    host = config.getString("uaahost")
+    super.init(config, executionContext)
+
+    if (config.getBoolean(ADDITIONAL_AUTHENTICATOR_ENABLED)) {
+      val additionalAuthenticatorConfig = config.getConfig(ADDITIONAL_AUTHENTICATOR)
+      val authenticatorClass = additionalAuthenticatorConfig
+        .getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
+      val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass)
+      val authenticator = clazz.newInstance().asInstanceOf[AdditionalAuthenticator]
+      authenticator.init(additionalAuthenticatorConfig, executionContext)
+      additionalAuthenticator = Option(authenticator)
+    }
+  }
+
+  protected override def extractUserName(body: String): String = {
+    val email = body.parseJson.asJsObject.fields("email").asInstanceOf[JsString]
+    email.value
+  }
+
+  protected override def oauth2Api(): DefaultApi20 = {
+    new CloudFoundryUAAService(authorizeUrl, accessTokenEndpoint)
+  }
+
+  protected override def authenticateWithAccessToken(accessToken: OAuth2AccessToken)
+    : Future[UserSession] = {
+
+    implicit val ec: ExecutionContext = executionContext
+
+    if (additionalAuthenticator.isDefined) {
+      super.authenticateWithAccessToken(accessToken).flatMap { user =>
+        additionalAuthenticator.get.authenticate(oauthService.getAsyncHttpClient, accessToken, user)
+      }
+    } else {
+      super.authenticateWithAccessToken(accessToken)
+    }
+  }
+}
+
+object CloudFoundryUAAOAuth2Authenticator {
+  private val RESPONSE_TYPE = "response_type"
+
+  val ADDITIONAL_AUTHENTICATOR_ENABLED = "additional-authenticator-enabled"
+  val ADDITIONAL_AUTHENTICATOR = "additional-authenticator"
+
+  private class CloudFoundryUAAService(authorizeUrl: String, accessTokenEndpoint: String)
+    extends BaseApi20(authorizeUrl, accessTokenEndpoint) {
+
+    private def base64(in: String): String = {
+      val encoder = new BASE64Encoder()
+      val utf8 = "UTF-8"
+      encoder.encode(in.getBytes(utf8))
+    }
+
+    override def createService(config: OAuthConfig): OAuth20Service = {
+      new OAuth20Service(this, config) {
+
+        protected override def createAccessTokenRequest[T <: AbstractRequest](
+            code: String, request: T): T = {
+          val config: OAuthConfig = getConfig()
+
+          request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE)
+          request.addParameter(OAuthConstants.CODE, code)
+          request.addParameter(RESPONSE_TYPE, "token")
+          request.addParameter(OAuthConstants.REDIRECT_URI, config.getCallback)
+
+          // Work around issue https://github.com/scribejava/scribejava/issues/641
+          request.addHeader("Content-Type", "application/x-www-form-urlencoded")
+
+          // CloudFoundry requires a Authorization header encoded with client Id and secret.
+          val authorizationHeader = "Basic " + base64(config.getApiKey + ":" + config.getApiSecret)
+          request.addHeader("Authorization", authorizationHeader)
+          request
+        }
+      }
+    }
+  }
+
+  /**
+   * Additional authenticator to check more credential attributes of user before logging in.
+   * This authenticator is applied AFTER user pass the initial (default) authenticator.
+   */
+  trait AdditionalAuthenticator {
+
+    /**
+     * Initialization
+     *
+     * @param config Configurations specifically used for this authenticator.
+     * @param executionContext Execution Context to use to run futures.
+     */
+    def init(config: Config, executionContext: ExecutionContext): Unit
+
+    /**
+     *
+     * @param accessToken, the accessToken for the UAA
+     * @param user user session returned by previous authenticator
+     * @return an updated UserSession
+     */
+    def authenticate(
+        asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, user: UserSession)
+      : Future[UserSession]
+  }
+
+  val ORGANIZATION_URL = "organization-url"
+
+  class OrganizationAccessChecker extends AdditionalAuthenticator {
+    private var organizationUrl: String = null
+    private implicit var executionContext: ExecutionContext = null
+
+    override def init(config: Config, executionContext: ExecutionContext): Unit = {
+      this.organizationUrl = config.getString(ORGANIZATION_URL)
+      this.executionContext = executionContext
+    }
+
+    override def authenticate(asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken,
+        user: UserSession): Future[UserSession] = {
+
+      val promise = Promise[UserSession]()
+      val builder = asyncClient.prepareGet(organizationUrl)
+      builder.addHeader("Authorization", s"bearer ${accessToken.getAccessToken}")
+      builder.execute(new AsyncCompletionHandler[Unit] {
+        override def onCompleted(response: client.Response): Unit = {
+          if (response.getStatusCode == 200) {
+            promise.success(user)
+          } else {
+            promise.failure(new Exception(response.getResponseBody))
+          }
+        }
+      })
+      promise.future
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
new file mode 100644
index 0000000..c62071c
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.security.oauth2.impl
+
+import com.github.scribejava.apis.google.GoogleJsonTokenExtractor
+import com.github.scribejava.core.builder.api.DefaultApi20
+import com.github.scribejava.core.extractors.TokenExtractor
+import com.github.scribejava.core.model._
+import spray.json._
+
+/**
+ *
+ * Does authentication with Google OAuth2 service. It only extract the email address
+ * from user profile of Google.
+ *
+ * Pre-requisite steps to use this Authenticator:
+ *
+ * Step1: Register your website as an OAuth2 Application on Google
+ *  1) Create an application representing your website at [[https://console.developers.google.com]]
+ *
+ *  2) In "API Manager" of your created application, enable API "Google+ API"
+ *
+ *  3) Create OAuth client ID for this application. In "Credentials" tab of "API Manager",
+ * choose "Create credentials", and then select OAuth client ID. Follow the wizard
+ * to set callback URL, and generate client ID, and client Secret. Callback URL is NOT optional.
+ *
+ * Step2: Configure the OAuth2 information in gear.conf
+ *
+ *  1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled"
+ * as true.
+ *
+ *  2) Configure section "gearpump.ui-security.oauth2-authenticators.google". Please make sure
+ * class name, client ID, client Secret, and callback URL are set properly.
+ *
+ * NOTE:  callback URL set here should match what is configured on Google in step1.
+ *
+ * Step3: Restart the UI service and try out the Google social login button in UI.
+ *
+ * NOTE:  OAuth requires Internet access, @see
+ *       [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] to find
+ *       some helpful tutorials
+ *
+ * NOTE:  Google use scope to define what data can be fetched by OAuth2. Currently we use profile
+ *       [[https://www.googleapis.com/auth/userinfo.email]]. However, Google may change the profile
+ *       in future.
+ *
+ * TODO Currently, this doesn't verify the state from Google OAuth2 response.
+ *
+ * See [[org.apache.gearpump.services.security.oauth2.OAuth2Authenticator]] for more
+ * API information.
+ */
+class GoogleOAuth2Authenticator extends BaseOAuth2Authenticator {
+
+  import org.apache.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator._
+
+  protected override def authorizeUrl: String = AuthorizeUrl
+
+  protected override def accessTokenEndpoint: String = AccessEndpoint
+
+  protected override def protectedResourceUrl: String = ResourceUrl
+
+  protected override def scope: String = GoogleOAuth2Authenticator.Scope
+
+  protected override def extractUserName(body: String): String = {
+    val emails = body.parseJson.asJsObject.fields("emails").asInstanceOf[JsArray]
+    val email = emails.elements(0).asJsObject("Cannot find email account")
+      .fields("value").asInstanceOf[JsString].value
+    email
+  }
+
+  override def oauth2Api(): DefaultApi20 = new AsyncGoogleApi20(authorizeUrl, accessTokenEndpoint)
+}
+
+object GoogleOAuth2Authenticator {
+
+  import org.apache.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator._
+
+  // scalastyle:off line.size.limit
+  val AuthorizeUrl = "https://accounts.google.com/o/oauth2/auth?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s"
+  // scalastyle:on line.size.limit
+  val AccessEndpoint = "https://www.googleapis.com/oauth2/v4/token"
+  val ResourceUrl = "https://www.googleapis.com/plus/v1/people/me"
+  val Scope = "https://www.googleapis.com/auth/userinfo.email"
+
+  private class AsyncGoogleApi20(authorizeUrl: String, accessEndpoint: String)
+    extends BaseApi20(authorizeUrl, accessEndpoint) {
+
+    override def getAccessTokenExtractor: TokenExtractor[OAuth2AccessToken] = {
+      GoogleJsonTokenExtractor.instance
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
new file mode 100644
index 0000000..caa3a33
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services.util
+
+import upickle.Js
+
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.Graph
+
+object UpickleUtil {
+
+  // For implicit type, we need to add EXPLICIT return type, otherwise, upickle may NOT infer the
+  // reader type automatically.
+  // See issue https://github.com/lihaoyi/upickle-pprint/issues/102
+  implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = {
+    upickle.default.Reader[Graph[Int, String]] {
+      case Js.Obj(verties, edges) =>
+        val vertexList = upickle.default.readJs[List[Int]](verties._2)
+        val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
+        Graph(vertexList, edgeList)
+    }
+  }
+
+  implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] {
+    case Js.Str(str) =>
+      WorkerId.parse(str)
+  }
+
+  implicit val workerIdWriter: upickle.default.Writer[WorkerId] = upickle.default.Writer[WorkerId] {
+    case workerId: WorkerId =>
+      Js.Str(WorkerId.render(workerId))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
deleted file mode 100644
index e67731e..0000000
--- a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.TestUtil
-
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-class AdminServiceSpec
-  extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll {
-
-  override def testConfig: Config = TestUtil.DEFAULT_CONFIG
-
-  implicit def actorSystem: ActorSystem = system
-
-  it should "shutdown the ActorSystem when receiving terminate" in {
-    val route = new AdminService(actorSystem).route
-    implicit val customTimeout = RouteTestTimeout(15.seconds)
-    (Post(s"/terminate") ~> route) ~> check {
-      assert(status.intValue() == 404)
-    }
-
-    Await.result(actorSystem.whenTerminated, 20.seconds)
-
-    // terminate should terminate current actor system
-    assert(actorSystem.whenTerminated.isCompleted)
-  }
-}