You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:25 UTC

[15/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
deleted file mode 100644
index d0fea30..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.http.scaladsl.model.StatusCodes._
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server.{Route, _}
-import akka.stream.ActorMaterializer
-import akka.util.Timeout
-import org.apache.commons.lang.exception.ExceptionUtils
-
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.util.{Constants, LogUtil}
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-/** Contains all REST API service endpoints */
-class RestServices(master: ActorRef, mat: ActorMaterializer, system: ActorSystem)
-  extends RouteService {
-
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-
-  private val config = system.settings.config
-
-  private val jarStoreService = JarStoreService.get(config)
-  jarStoreService.init(config, system)
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  private val securityEnabled = config.getBoolean(
-    Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED)
-
-  private val supervisorPath = system.settings.config.getString(
-    Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
-
-  private val myExceptionHandler: ExceptionHandler = ExceptionHandler {
-    case ex: Throwable => {
-      extractUri { uri =>
-        LOG.error(s"Request to $uri could not be handled normally", ex)
-        complete(InternalServerError, ExceptionUtils.getStackTrace(ex))
-      }
-    }
-  }
-
-  // Makes sure staticRoute is the final one, as it will try to lookup resource in local path
-  // if there is no match in previous routes
-  private val static = new StaticService(system, supervisorPath).route
-
-  def supervisor: ActorRef = {
-    if (supervisorPath == null || supervisorPath.isEmpty()) {
-      null
-    } else {
-      val actorRef = system.actorSelection(supervisorPath).resolveOne()
-      Await.result(actorRef, new Timeout(Duration.create(5, "seconds")).duration)
-    }
-  }
-
-  override def route: Route = {
-    if (securityEnabled) {
-      val security = new SecurityService(services, system)
-      handleExceptions(myExceptionHandler) {
-        security.route ~ static
-      }
-    } else {
-      handleExceptions(myExceptionHandler) {
-        services.route ~ static
-      }
-    }
-  }
-
-  private def services: RouteService = {
-
-    val admin = new AdminService(system)
-    val masterService = new MasterService(master, jarStoreService, system)
-    val worker = new WorkerService(master, system)
-    val app = new AppMasterService(master, jarStoreService, system)
-    val sup = new SupervisorService(master, supervisor, system)
-
-    new RouteService {
-      override def route: Route = {
-        admin.route ~ sup.route ~ masterService.route ~ worker.route ~ app.route
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
deleted file mode 100644
index b1abf62..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model.headers.{HttpChallenge, HttpCookie, HttpCookiePair}
-import akka.http.scaladsl.model.{RemoteAddress, StatusCodes, Uri}
-import akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsMissing, CredentialsRejected}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._
-import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet
-import akka.stream.Materializer
-import com.typesafe.config.Config
-import com.softwaremill.session.SessionDirectives._
-import com.softwaremill.session.SessionOptions._
-import com.softwaremill.session.{MultiValueSessionSerializer, SessionConfig, SessionManager}
-import upickle.default.write
-
-import io.gearpump.security.{Authenticator => BaseAuthenticator}
-import io.gearpump.services.SecurityService.{User, UserSession}
-import io.gearpump.services.security.oauth2.OAuth2Authenticator
-import io.gearpump.util.{Constants, LogUtil}
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-/**
- * Security authentication endpoint.
- *
- * - When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection
- * - When user can be authenticated, but are not authorized to access certail resource, will
- *   return a 405 AuthorizationFailedRejection.
- * - When web UI frontend receive 401, it should redirect the UI to login page.
- * - When web UI receive 405,it should display errors like
- *   "current user is not authorized to access this resource."
- *
- * The Authenticator used is pluggable, the current Authenticator is resolved by looking up
- * config path [[io.gearpump.util.Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]].
- *
- * See [[io.gearpump.security.Authenticator]] to find more info on custom Authenticator.
- */
-class SecurityService(inner: RouteService, implicit val system: ActorSystem) extends RouteService {
-
-  // Use scheme "GearpumpBasic" to avoid popping up web browser native authentication box.
-  private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = "gearpump",
-    params = Map.empty)
-
-  val LOG = LogUtil.getLogger(getClass, "AUDIT")
-
-  private val config = system.settings.config
-  private val sessionConfig = SessionConfig.fromConfig(config)
-  private implicit val sessionManager: SessionManager[UserSession] =
-    new SessionManager[UserSession](sessionConfig)
-
-  private val authenticator = {
-    val clazz = Class.forName(config.getString(Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS))
-    val constructor = clazz.getConstructor(classOf[Config])
-    val authenticator = constructor.newInstance(config).asInstanceOf[BaseAuthenticator]
-    authenticator
-  }
-
-  private def configToMap(config: Config, path: String) = {
-    import scala.collection.JavaConverters._
-    config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k -> v.toString }
-  }
-
-  private val oauth2Providers: Map[String, String] = {
-    if (config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)) {
-      val map = configToMap(config, Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS)
-      map.keys.toList.map { key =>
-        val iconPath = config.getString(s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$key.icon")
-        (key, iconPath)
-      }.toMap
-    } else {
-      Map.empty[String, String]
-    }
-  }
-
-  private def authenticate(user: String, pass: String)(implicit ec: ExecutionContext)
-  : Future[Option[UserSession]] = {
-    authenticator.authenticate(user, pass, ec).map { result =>
-      if (result.authenticated) {
-        Some(UserSession(user, result.permissionLevel))
-      } else {
-        None
-      }
-    }
-  }
-
-  private def rejectMissingCredentials: Route = {
-    reject(AuthenticationFailedRejection(CredentialsMissing, challenge))
-  }
-
-  private def rejectWrongCredentials: Route = {
-    reject(AuthenticationFailedRejection(CredentialsRejected, challenge))
-  }
-
-  private def requireAuthentication(inner: UserSession => Route): Route = {
-    optionalSession(oneOff, usingCookiesOrHeaders) { sessionOption =>
-      sessionOption match {
-        case Some(session) => {
-          inner(session)
-        }
-        case None =>
-          rejectMissingCredentials
-      }
-    }
-  }
-
-  private def login(session: UserSession, ip: String, redirectToRoot: Boolean = false): Route = {
-    setSession(oneOff, usingCookies, session) {
-      val user = session.user
-      // Default: 1 day
-      val maxAgeMs = 1000 * sessionConfig.sessionMaxAgeSeconds.getOrElse(24 * 3600L)
-      setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = Some("/"),
-        maxAge = Some(maxAgeMs))) {
-        LOG.info(s"user $user login from $ip")
-        if (redirectToRoot) {
-          redirect(Uri("/"), StatusCodes.TemporaryRedirect)
-        } else {
-          complete(write(new User(user)))
-        }
-      }
-    }
-  }
-
-  private def logout(user: UserSession, ip: String): Route = {
-    invalidateSession(oneOff, usingCookies) { ctx =>
-      LOG.info(s"user ${user.user} logout from $ip")
-      ctx.complete(write(new User(user.user)))
-    }
-  }
-
-  // Only admin are able to access operation like post/delete/put
-  private def requireAuthorization(user: UserSession, route: => Route): Route = {
-    // Valid user
-    if (user.permissionLevel >= BaseAuthenticator.User.permissionLevel) {
-      route
-    } else {
-      // Possibly a guest or not authenticated.
-      (put | delete | post) {
-        // Reject with 405 authorization error
-        reject(AuthorizationFailedRejection)
-      } ~
-      get {
-        route
-      }
-    }
-  }
-
-  private val unknownIp: Directive1[RemoteAddress] = {
-    Directive[Tuple1[RemoteAddress]]{ inner =>
-      inner(new Tuple1(RemoteAddress.Unknown))
-    }
-  }
-
-  override val route: Route = {
-
-    extractExecutionContext{implicit ec: ExecutionContext =>
-    extractMaterializer{implicit mat: Materializer =>
-    (extractClientIP | unknownIp) { ip =>
-      pathPrefix("login") {
-        pathEndOrSingleSlash {
-          get {
-            getFromResource("login/login.html")
-          } ~
-          post {
-            // Guest account don't have permission to submit new application in UI
-            formField(FieldMagnet('username.as[String])) {user: String =>
-              formFields(FieldMagnet('password.as[String])) {pass: String =>
-                val result = authenticate(user, pass)
-                onSuccess(result) {
-                  case Some(session) =>
-                    login(session, ip.toString)
-                  case None =>
-                    rejectWrongCredentials
-                }
-              }
-            }
-          }
-        } ~
-        path ("oauth2" / "providers") {
-          // Responds with a list of OAuth2 providers.
-          complete(write(oauth2Providers))
-        } ~
-        // Support OAUTH Authentication
-        pathPrefix ("oauth2"/ Segment) {providerName =>
-        // Resolve OAUTH Authentication Provider
-        val oauthService = OAuth2Authenticator.get(config, providerName, ec)
-
-          if (oauthService == null) {
-            // OAuth2 is disabled.
-            complete(StatusCodes.NotFound)
-          } else {
-
-            def loginWithOAuth2Parameters(parameters: Map[String, String]): Route = {
-              val result = oauthService.authenticate(parameters)
-              onComplete(result) {
-                case Success(session) =>
-                  login(session, ip.toString, redirectToRoot = true)
-                case Failure(ex) => {
-                  LOG.info(s"Failed to login user from ${ip.toString}", ex)
-                  rejectWrongCredentials
-                }
-              }
-            }
-
-            path ("authorize") {
-              // Redirects to OAuth2 service provider for authorization.
-              redirect(Uri(oauthService.getAuthorizationUrl), StatusCodes.TemporaryRedirect)
-            } ~
-            path ("accesstoken") {
-              post {
-                // Guest account don't have permission to submit new application in UI
-                formField(FieldMagnet('accesstoken.as[String])) {accesstoken: String =>
-                  loginWithOAuth2Parameters(Map("accesstoken" -> accesstoken))
-                }
-              }
-            } ~
-            path("callback") {
-              // Login with authorization code or access token.
-              parameterMap {parameters =>
-                loginWithOAuth2Parameters(parameters)
-              }
-            }
-          }
-        }
-      } ~
-      path("logout") {
-        post {
-          requireAuthentication {session =>
-            logout(session, ip.toString())
-          }
-        }
-      } ~
-      requireAuthentication {user =>
-        requireAuthorization(user, inner.route)
-      }
-    }}}
-
-  }
-}
-
-object SecurityService {
-
-  val SESSION_MANAGER_KEY = "akka.http.session.server-secret"
-
-  case class UserSession(user: String, permissionLevel: Int)
-
-  object UserSession {
-
-    private val User = "user"
-    private val PermissionLevel = "permissionLevel"
-
-    implicit def serializer: MultiValueSessionSerializer[UserSession] = {
-      new MultiValueSessionSerializer[UserSession](
-        toMap = {t: UserSession =>
-          Map(User -> t.user, PermissionLevel -> t.permissionLevel.toString)
-        },
-        fromMap = {m: Map[String, String] =>
-          if (m.contains(User)) {
-            Try(UserSession(m(User), m(PermissionLevel).toInt))
-          } else {
-            Failure[UserSession](new Exception("Fail to parse session "))
-          }
-        }
-      )
-    }
-  }
-
-  case class User(user: String)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
deleted file mode 100644
index fc990a1..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.model._
-import akka.http.scaladsl.server.Directives._
-import akka.stream.Materializer
-
-import io.gearpump.util.Util
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-/**
- * static resource files.
- */
-class StaticService(override val system: ActorSystem, supervisorPath: String)
-  extends BasicService {
-
-  private val version = Util.version
-
-  protected override def prefix = Neutral
-
-  override def cache: Boolean = true
-
-  protected override def doRoute(implicit mat: Materializer) = {
-    path("version") {
-      get { ctx =>
-        ctx.complete(version)
-      }
-    } ~
-    // For YARN usage, we need to make sure supervisor-path
-    // can be accessed without authentication.
-    path("supervisor-actor-path") {
-      get {
-        complete(supervisorPath)
-      }
-    } ~
-    pathEndOrSingleSlash {
-      getFromResource("index.html")
-    } ~
-    path("favicon.ico") {
-      complete(StatusCodes.NotFound)
-    } ~
-    pathPrefix("webjars") {
-      get {
-        getFromResourceDirectory("META-INF/resources/webjars")
-      }
-    } ~
-    path(Rest) { path =>
-      getFromResource("%s" format path)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
deleted file mode 100644
index c7f19e4..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server.Route
-import akka.stream.Materializer
-
-import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import io.gearpump.cluster.ClientToMaster._
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.services.SupervisorService.{Path, Status}
-import io.gearpump.util.ActorUtil._
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-/** Responsible for adding/removing machines. Typically it delegates to YARN. */
-class SupervisorService(
-    val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem)
-  extends BasicService {
-
-  import upickle.default.write
-
-  /**
-   * TODO: Add additional check to ensure the user have enough authorization to
-   * add/remove a worker machine
-   */
-  private def authorize(internal: Route): Route = {
-    if (supervisor == null) {
-      failWith(new Exception("API not enabled, cannot find a valid supervisor! " +
-        "Please make sure Gearpump is running on top of YARN or other resource managers"))
-    } else {
-      internal
-    }
-  }
-
-  protected override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") {
-    pathEnd {
-      get {
-        val path = if (supervisor == null) {
-          null
-        } else {
-          supervisor.path.toString
-        }
-        complete(write(Path(path)))
-      }
-    } ~
-    path("status") {
-      post {
-        if (supervisor == null) {
-          complete(write(Status(enabled = false)))
-        } else {
-          complete(write(Status(enabled = true)))
-        }
-      }
-    } ~
-    path("addworker" / IntNumber) { workerCount =>
-      post {
-        authorize {
-          onComplete(askActor[CommandResult](supervisor, AddWorker(workerCount))) {
-            case Success(value) =>
-              complete(write(value))
-            case Failure(ex) =>
-              failWith(ex)
-          }
-        }
-      }
-    } ~
-    path("removeworker" / Segment) { workerIdString =>
-      post {
-        authorize {
-          val workerId = WorkerId.parse(workerIdString)
-          def future(): Future[CommandResult] = {
-            askWorker[WorkerData](master, workerId, GetWorkerData(workerId)).flatMap{workerData =>
-              val containerId = workerData.workerDescription.resourceManagerContainerId
-              askActor[CommandResult](supervisor, RemoveWorker(containerId))
-            }
-          }
-
-          onComplete[CommandResult](future()) {
-            case Success(value) =>
-              complete(write(value))
-            case Failure(ex) =>
-              failWith(ex)
-          }
-        }
-      }
-    }
-  }
-}
-
-object SupervisorService {
-  case class Status(enabled: Boolean)
-
-  case class Path(path: String)
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
deleted file mode 100644
index 78cdb37..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services
-
-import scala.util.{Failure, Success}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.http.scaladsl.server.Directives._
-import akka.stream.Materializer
-
-import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ReadOption}
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.ActorUtil._
-import io.gearpump.util.Constants
-// NOTE: This cannot be removed!!!
-import io.gearpump.services.util.UpickleUtil._
-
-/** Service to handle worker related queries */
-class WorkerService(val master: ActorRef, override val system: ActorSystem)
-  extends BasicService {
-
-  import upickle.default.write
-  private val systemConfig = system.settings.config
-  private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
-
-  protected override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) {
-    workerIdString => {
-      pathEnd {
-        val workerId = WorkerId.parse(workerIdString)
-        onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) {
-          case Success(value: WorkerData) =>
-            complete(write(value.workerDescription))
-          case Failure(ex) => failWith(ex)
-        }
-      }
-    }~
-    path("config") {
-      val workerId = WorkerId.parse(workerIdString)
-      onComplete(askWorker[WorkerConfig](master, workerId, QueryWorkerConfig(workerId))) {
-        case Success(value: WorkerConfig) =>
-          val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
-          complete(config)
-        case Failure(ex) =>
-          failWith(ex)
-      }
-    } ~
-    path("metrics" / RestPath ) { path =>
-      val workerId = WorkerId.parse(workerIdString)
-      parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
-        val query = QueryHistoryMetrics(path.head.toString, readOption)
-        onComplete(askWorker[HistoryMetrics](master, workerId, query)) {
-          case Success(value) =>
-            complete(write(value))
-          case Failure(ex) =>
-            failWith(ex)
-        }
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala b/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala
deleted file mode 100644
index a647a8d..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.main
-
-import java.util.Random
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-
-import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.server.Route
-import akka.stream.ActorMaterializer
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-import sun.misc.BASE64Encoder
-
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, Gear}
-import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.services.{RestServices, SecurityService}
-import io.gearpump.util.LogUtil.ProcessType
-import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
-
-/** Command line to start UI server */
-object Services extends AkkaApp with ArgumentsParser {
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "master" -> CLIOption("<host:port>", required = false),
-    Gear.OPTION_CONFIG -> CLIOption("<provide a custom configuration file>", required = false),
-    "supervisor" -> CLIOption("<Supervisor Actor Path>", required = false, Some("")))
-
-  override val description = "UI Server"
-
-  override def akkaConfig: Config = {
-    ClusterConfig.ui()
-  }
-
-  override def help(): Unit = {
-    // scalastyle:off println
-    Console.println("UI Server")
-    // scalastyle:on println
-  }
-
-  private var killFunction: Option[() => Unit] = None
-
-  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
-
-    val argConfig = parse(args)
-    var akkaConf =
-      if (argConfig.exists(Gear.OPTION_CONFIG)) {
-        ClusterConfig.ui(argConfig.getString(Gear.OPTION_CONFIG))
-      } else {
-        inputAkkaConf
-      }
-
-    val LOG: Logger = {
-      LogUtil.loadConfiguration(akkaConf, ProcessType.UI)
-      LogUtil.getLogger(getClass)
-    }
-
-    if (argConfig.exists("master")) {
-      val master = argConfig.getString("master")
-      akkaConf = akkaConf.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
-        ConfigValueFactory.fromIterable(List(master).asJava))
-    }
-
-    akkaConf = akkaConf.withValue(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH,
-      ConfigValueFactory.fromAnyRef(argConfig.getString("supervisor")))
-      // Creates a random unique secret key for session manager.
-      // All previous stored session token cookies will be invalidated when UI
-      // server is restarted.
-      .withValue(SecurityService.SESSION_MANAGER_KEY,
-        ConfigValueFactory.fromAnyRef(randomSeverSecret()))
-
-    val masterCluster = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
-      .flatMap(Util.parseHostList)
-
-    implicit val system = ActorSystem("services", akkaConf)
-    implicit val executionContext = system.dispatcher
-
-    import scala.concurrent.duration._
-    val master = system.actorOf(MasterProxy.props(masterCluster, 1.day),
-      s"masterproxy${system.name}")
-    val (host, port) = parseHostPort(system.settings.config)
-
-    implicit val mat = ActorMaterializer()
-    val services = new RestServices(master, mat, system)
-
-    val bindFuture = Http().bindAndHandle(Route.handlerFlow(services.route), host, port)
-    Await.result(bindFuture, 15.seconds)
-
-    val displayHost = if (host == "0.0.0.0") "127.0.0.1" else host
-    LOG.info(s"Please browse to http://$displayHost:$port to see the web UI")
-
-    // scalastyle:off println
-    println(s"Please browse to http://$displayHost:$port to see the web UI")
-    // scalastyle:on println
-
-    killFunction = Some { () =>
-      LOG.info("Shutting down UI Server")
-      system.terminate()
-    }
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  private def randomSeverSecret(): String = {
-    val random = new Random()
-    val length = 64 // Required
-    val bytes = new Array[Byte](length)
-    random.nextBytes(bytes)
-    val endecoder = new BASE64Encoder()
-    endecoder.encode(bytes)
-  }
-
-  private def parseHostPort(config: Config): (String, Int) = {
-    val port = config.getInt(Constants.GEARPUMP_SERVICE_HTTP)
-    val host = config.getString(Constants.GEARPUMP_SERVICE_HOST)
-    (host, port)
-  }
-
-  // TODO: fix this
-  // Hacks around for YARN module, so that we can kill the UI server
-  // when application is shutting down.
-  def kill(): Unit = {
-    if (killFunction.isDefined) {
-      killFunction.get.apply()
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/package.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/package.scala b/services/jvm/src/main/scala/io/gearpump/services/package.scala
deleted file mode 100644
index 7b38134..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/package.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump
-
-package object services {
-  final val REST_VERSION = "v1.0"
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
deleted file mode 100644
index 158d406..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2
-
-import scala.concurrent.{ExecutionContext, Future}
-
-import com.typesafe.config.Config
-
-import io.gearpump.services.SecurityService.UserSession
-import io.gearpump.util.Constants
-import io.gearpump.util.Constants._
-
-/**
- *
- * Uses OAuth2 social-login as the mechanism for authentication.
- * @see [[https://tools.ietf.org/html/rfc6749]] to find what is OAuth2, and how it works.
- *
- * Basically flow for OAuth2 Authentication:
- *  1. User accesses Gearpump UI website, and choose to login with OAuth2 server.
- *  2. Gearpump UI website redirects user to OAuth2 server domain authorization endpoint.
- *  3. End user complete the authorization in the domain of OAuth2 server.
- *  4. OAuth2 server redirects user back to Gearpump UI server.
- *  5. Gearpump UI server verify the tokens and extract credentials from query
- *    parameters and form fields.
- *
- * NOTE: '''Thread-safety''' is a MUST requirement. Developer need to ensure the sub-class is
- *      thread-safe. Sub-class should have a parameterless constructor.
- *
- * NOTE:  OAuth2 Authenticator requires access of Internet. Please make sure HTTP proxy are
- * set properly if applied.
- *
- * Example: Config proxy when UI server is started on Windows:
- * {{{
- *   > set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com
- *      -Dhttps.proxyPort=8088
- *   > bin\services
- * }}}
- *
- * Example: Config proxy when UI server is started on Linux:
- * {{{
- *   $ export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 -Dhttps.proxyHost=xx.com
- *      -Dhttps.proxyPort=8088"
- *   $ bin/services
- * }}}
- */
-trait OAuth2Authenticator {
-
-  /**
-   * Inits authenticator with config which contains client ID, client secret, and etc..
-   *
-   * Typically, the client key and client secret is provided by OAuth2 Authorization server
-   * when user register an application there.
-   * See [[https://tools.ietf.org/html/rfc6749]] for definition of client, client Id,
-   * and client secret.
-   *
-   * See [[https://developer.github.com/v3/oauth/]] for an actual example of how Github
-   * use client key, and client secret.
-   *
-   * NOTE:  '''Thread-Safety''': Framework ensures this call is synchronized.
-   *
-   * @param config Client Id, client secret, callback URL and etc..
-   * @param executionContext ExecutionContext from hosting environment.
-   */
-  def init(config: Config, executionContext: ExecutionContext): Unit
-
-  /**
-   * Returns the OAuth Authorization URL so for redirection to that address to do OAuth2
-   * authorization.
-   *
-   * NOTE:  '''Thread-Safety''': This can be called in a multi-thread environment. Developer
-   *      need to ensure thread safety.
-   */
-  def getAuthorizationUrl: String
-
-  /**
-   * After authorization, OAuth2 server redirects user back with tokens. This verify the
-   * tokens, retrieve the profiles, and return [[UserSession]] information.
-   *
-   * NOTE:  This is an Async call.
-   *
-   * NOTE:  This call requires external internet access.
-   *
-   * NOTE:  '''Thread-Safety''': This can be called in a multi-thread environment. Developer
-   *      need to ensure thread safety.
-   *
-   * @param parameters HTTP Query and Post parameters, which typically contains Authorization code.
-   * @return UserSession if authentication pass.
-   */
-  def authenticate(parameters: Map[String, String]): Future[UserSession]
-
-  /**
-   * Clean up resource
-   */
-  def close(): Unit
-}
-
-object OAuth2Authenticator {
-
-  // Serves as a quick immutable lookup cache
-  private var providers = Map.empty[String, OAuth2Authenticator]
-
-  /**
-   * Load Authenticator from [[Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS]]
-   *
-   * @param provider, Name for the OAuth2 Authentication Service.
-   * @return Returns null if the OAuth2 Authentication is disabled.
-   */
-  def get(config: Config, provider: String, executionContext: ExecutionContext)
-    : OAuth2Authenticator = {
-
-    if (providers.contains(provider)) {
-      providers(provider)
-    } else {
-      val path = s"${Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATORS}.$provider"
-      val enabled = config.getBoolean(Constants.GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED)
-      if (enabled && config.hasPath(path)) {
-        this.synchronized {
-          if (providers.contains(provider)) {
-            providers(provider)
-          } else {
-            val authenticatorConfig = config.getConfig(path)
-            val authenticatorClass = authenticatorConfig.getString(
-              GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
-            val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass)
-            val authenticator = clazz.newInstance().asInstanceOf[OAuth2Authenticator]
-            authenticator.init(authenticatorConfig, executionContext)
-            providers += provider -> authenticator
-            authenticator
-          }
-        }
-      } else {
-        null
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
deleted file mode 100644
index e422d3f..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2.impl
-
-import java.util.concurrent.atomic.AtomicBoolean
-import scala.collection.mutable.StringBuilder
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success}
-
-import com.typesafe.config.Config
-import com.github.scribejava.core.builder.ServiceBuilderAsync
-import com.github.scribejava.core.builder.api.DefaultApi20
-import com.github.scribejava.core.model._
-import com.github.scribejava.core.oauth.OAuth20Service
-import com.github.scribejava.core.utils.OAuthEncoder
-import com.ning.http.client.AsyncHttpClientConfig
-
-import io.gearpump.security.Authenticator
-import io.gearpump.services.SecurityService.UserSession
-import io.gearpump.services.security.oauth2.OAuth2Authenticator
-import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20
-import io.gearpump.util.Constants._
-import io.gearpump.util.Util
-
-/**
- * Uses Ning AsyncClient to connect to OAuth2 service.
- *
- * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more API information.
- */
-abstract class BaseOAuth2Authenticator extends OAuth2Authenticator {
-
-  // Authorize Url for end user to authorize
-  protected def authorizeUrl: String
-
-  // Used to fetch the Access Token.
-  protected def accessTokenEndpoint: String
-
-  // Protected resource Url to get the user profile
-  protected def protectedResourceUrl: String
-
-  // Extracts the username information from response of protectedResourceUrl
-  protected def extractUserName(body: String): String
-
-  // Scope required to access protectedResourceUrl
-  protected def scope: String
-
-  // OAuth2 endpoint definition for ScribeJava.
-  protected def oauth2Api(): DefaultApi20 = {
-    new BaseApi20(authorizeUrl, accessTokenEndpoint)
-  }
-
-  protected var oauthService: OAuth20Service = null
-
-  protected var executionContext: ExecutionContext = null
-
-  private var defaultPermissionLevel = Authenticator.Guest.permissionLevel
-
-  // Synchronization ensured by the caller
-  override def init(config: Config, executionContext: ExecutionContext): Unit = {
-    if (this.oauthService == null) {
-      val callback = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK)
-      val clientId = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID)
-      val clientSecret = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET)
-      defaultPermissionLevel = {
-        val role = config.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE)
-        role match {
-          case "guest" => Authenticator.Guest.permissionLevel
-          case "user" => Authenticator.User.permissionLevel
-          case "admin" => Authenticator.Admin.permissionLevel
-          case _ => Authenticator.UnAuthenticated.permissionLevel
-        }
-      }
-      this.oauthService = buildOAuth2Service(clientId, clientSecret, callback)
-      this.executionContext = executionContext
-    }
-  }
-
-  private val isClosed: AtomicBoolean = new AtomicBoolean(false)
-
-  override def close(): Unit = {
-    if (isClosed.compareAndSet(false, true)) {
-      if (null != oauthService && null != oauthService.getAsyncHttpClient()) {
-        oauthService.getAsyncHttpClient().close()
-      }
-    }
-  }
-
-  override def getAuthorizationUrl(): String = {
-    oauthService.getAuthorizationUrl()
-  }
-
-  protected def authenticateWithAccessToken(accessToken: OAuth2AccessToken): Future[UserSession] = {
-    val promise = Promise[UserSession]()
-    val request = new OAuthRequestAsync(Verb.GET, protectedResourceUrl, oauthService)
-    oauthService.signRequest(accessToken, request)
-    request.sendAsync {
-      new OAuthAsyncRequestCallback[Response] {
-        override def onCompleted(response: Response): Unit = {
-          try {
-            val user = extractUserName(response.getBody)
-            promise.success(new UserSession(user, defaultPermissionLevel))
-          } catch {
-            case ex: Throwable =>
-              promise.failure(ex)
-          }
-        }
-
-        override def onThrowable(throwable: Throwable): Unit = {
-          promise.failure(throwable)
-        }
-      }
-    }
-    promise.future
-  }
-
-  protected def authenticateWithAuthorizationCode(code: String): Future[UserSession] = {
-
-    implicit val ec: ExecutionContext = executionContext
-
-    val promise = Promise[UserSession]()
-    oauthService.getAccessTokenAsync(code,
-
-      new OAuthAsyncRequestCallback[OAuth2AccessToken] {
-        override def onCompleted(accessToken: OAuth2AccessToken): Unit = {
-          authenticateWithAccessToken(accessToken).onComplete {
-            case Success(user) => promise.success(user)
-            case Failure(ex) => promise.failure(ex)
-          }
-        }
-
-        override def onThrowable(throwable: Throwable): Unit = {
-          promise.failure(throwable)
-        }
-      })
-    promise.future
-  }
-
-  override def authenticate(parameters: Map[String, String]): Future[UserSession] = {
-
-    val code = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE)
-    val accessToken = parameters.get(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN)
-
-    if (accessToken.isDefined) {
-      authenticateWithAccessToken(new OAuth2AccessToken(accessToken.get))
-    } else if (code.isDefined) {
-      authenticateWithAuthorizationCode(code.get)
-    } else {
-      // Fails authentication if code not exist
-      Future.failed(new Exception("Fail to authenticate user as there is no code parameter in URL"))
-    }
-  }
-
-  private def buildOAuth2Service(clientId: String, clientSecret: String, callback: String)
-    : OAuth20Service = {
-    val state: String = "state" + Util.randInt()
-    ScribeJavaConfig.setForceTypeOfHttpRequests(
-      ForceTypeOfHttpRequest.FORCE_ASYNC_ONLY_HTTP_REQUESTS)
-    val clientConfig: AsyncHttpClientConfig = new AsyncHttpClientConfig.Builder()
-      .setMaxConnections(5)
-      .setUseProxyProperties(true)
-      .setRequestTimeout(60000)
-      .setAllowPoolingConnections(false)
-      .setPooledConnectionIdleTimeout(60000)
-      .setReadTimeout(60000).build
-
-    val service: OAuth20Service = new ServiceBuilderAsync()
-      .apiKey(clientId)
-      .apiSecret(clientSecret)
-      .scope(scope)
-      .state(state)
-      .callback(callback)
-      .asyncHttpClientConfig(clientConfig)
-      .build(oauth2Api())
-
-    service
-  }
-}
-
-object BaseOAuth2Authenticator {
-
-  class BaseApi20(authorizeUrl: String, accessTokenEndpoint: String) extends DefaultApi20 {
-    def getAccessTokenEndpoint: String = {
-      accessTokenEndpoint
-    }
-
-    def getAuthorizationUrl(config: OAuthConfig): String = {
-      val sb: StringBuilder = new StringBuilder(String.format(authorizeUrl,
-        config.getResponseType, config.getApiKey, OAuthEncoder.encode(config.getCallback),
-        OAuthEncoder.encode(config.getScope)))
-      val state: String = config.getState
-      if (state != null) {
-        sb.append('&').append(OAuthConstants.STATE).append('=').append(OAuthEncoder.encode(state))
-      }
-      sb.toString
-    }
-
-    override def createService(config: OAuthConfig): OAuth20Service = {
-      new OAuth20Service(this, config) {
-
-        protected override def createAccessTokenRequest[T <: AbstractRequest](
-            code: String, request: T): T = {
-          super.createAccessTokenRequest(code, request)
-
-          if (!getConfig.hasGrantType) {
-            request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE)
-          }
-
-          // Work-around for issue https://github.com/scribejava/scribejava/issues/641
-          request.addHeader("Content-Type", "application/x-www-form-urlencoded")
-          request
-        }
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
deleted file mode 100644
index 98e37ea..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2.impl
-
-import scala.concurrent.{ExecutionContext, Future, Promise}
-
-import com.typesafe.config.Config
-import com.github.scribejava.core.builder.api.DefaultApi20
-import com.github.scribejava.core.model._
-import com.github.scribejava.core.oauth.OAuth20Service
-import com.ning.http.client
-import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient}
-import spray.json.{JsString, _}
-import sun.misc.BASE64Encoder
-
-import io.gearpump.services.SecurityService.UserSession
-import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20
-import io.gearpump.util.Constants._
-
-/**
- *
- * Does authentication with CloudFoundry UAA service. Currently it only
- * extract the email address of end user.
- *
- * For what is UAA,
- * See [[https://github.com/cloudfoundry/uaa for information about CloudFoundry UAA]]
- *      (User Account and Authentication Service)
- *
- * Pre-requisite steps to use this Authenticator:
- *
- * Step1: Register your website to UAA with tool uaac.
- *  1) Check tutorial on uaac at
- *    [[https://docs.cloudfoundry.org/adminguide/uaa-user-management.html]]
- *
- *  2) Open a bash shell, set the UAA server by command `uaac target`
- *    {{{
- *    uaac target [your uaa server url]
- *    }}}
- *
- * NOTE: [your uaa server url] should match the uaahost settings in gear.conf
- *
- *  3) Login in as user admin by
- *     {{{
- *     uaac token client get admin -s MyAdminPassword
- *     }}}
- *
- *  4) Create a new Application (Client) in UAA,
- * {{{
- *   uaac client add [your_client_id]
- *     --scope "openid cloud_controller.read"
- *     --authorized_grant_types "authorization_code client_credentials refresh_token"
- *     --authorities "openid cloud_controller.read"
- *     --redirect_uri [your_redirect_url]
- *     --autoapprove true
- *     --secret [your_client_secret]
- * }}}
- *
- * Step2: Configure the OAuth2 information in gear.conf
- *
- *  1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled"
- * as true.
- *
- *  2) Navigate to section "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa"
- *
- *  3) Config gear.conf "gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" section.
- * Please make sure class name, client ID, client Secret, and callback URL are set properly.
- *
- * NOTE:  The callback URL here should match what you set on CloudFoundry UAA in step1.
- *
- * Step3: Restart the UI service and try the "social login" button for UAA.
- *
- * NOTE:  OAuth requires Internet access, @see
- *       [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] to find tutorials to
- *       configure Internet proxy.
- *
- * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more background
- *     information of OAuth2.
- */
-class CloudFoundryUAAOAuth2Authenticator extends BaseOAuth2Authenticator {
-
-  import io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator._
-
-  private var host: String = null
-
-  protected override def authorizeUrl: String =
-    s"$host/oauth/authorize?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s"
-
-  protected override def accessTokenEndpoint: String = s"$host/oauth/token"
-
-  protected override def protectedResourceUrl: String = s"$host/userinfo"
-
-  protected override def scope: String = "openid,cloud_controller.read"
-
-  private var additionalAuthenticator: Option[AdditionalAuthenticator] = None
-
-  override def init(config: Config, executionContext: ExecutionContext): Unit = {
-    host = config.getString("uaahost")
-    super.init(config, executionContext)
-
-    if (config.getBoolean(ADDITIONAL_AUTHENTICATOR_ENABLED)) {
-      val additionalAuthenticatorConfig = config.getConfig(ADDITIONAL_AUTHENTICATOR)
-      val authenticatorClass = additionalAuthenticatorConfig
-        .getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
-      val clazz = Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass)
-      val authenticator = clazz.newInstance().asInstanceOf[AdditionalAuthenticator]
-      authenticator.init(additionalAuthenticatorConfig, executionContext)
-      additionalAuthenticator = Option(authenticator)
-    }
-  }
-
-  protected override def extractUserName(body: String): String = {
-    val email = body.parseJson.asJsObject.fields("email").asInstanceOf[JsString]
-    email.value
-  }
-
-  protected override def oauth2Api(): DefaultApi20 = {
-    new CloudFoundryUAAService(authorizeUrl, accessTokenEndpoint)
-  }
-
-  protected override def authenticateWithAccessToken(accessToken: OAuth2AccessToken)
-    : Future[UserSession] = {
-
-    implicit val ec: ExecutionContext = executionContext
-
-    if (additionalAuthenticator.isDefined) {
-      super.authenticateWithAccessToken(accessToken).flatMap { user =>
-        additionalAuthenticator.get.authenticate(oauthService.getAsyncHttpClient, accessToken, user)
-      }
-    } else {
-      super.authenticateWithAccessToken(accessToken)
-    }
-  }
-}
-
-object CloudFoundryUAAOAuth2Authenticator {
-  private val RESPONSE_TYPE = "response_type"
-
-  val ADDITIONAL_AUTHENTICATOR_ENABLED = "additional-authenticator-enabled"
-  val ADDITIONAL_AUTHENTICATOR = "additional-authenticator"
-
-  private class CloudFoundryUAAService(authorizeUrl: String, accessTokenEndpoint: String)
-    extends BaseApi20(authorizeUrl, accessTokenEndpoint) {
-
-    private def base64(in: String): String = {
-      val encoder = new BASE64Encoder()
-      val utf8 = "UTF-8"
-      encoder.encode(in.getBytes(utf8))
-    }
-
-    override def createService(config: OAuthConfig): OAuth20Service = {
-      new OAuth20Service(this, config) {
-
-        protected override def createAccessTokenRequest[T <: AbstractRequest](
-            code: String, request: T): T = {
-          val config: OAuthConfig = getConfig()
-
-          request.addParameter(OAuthConstants.GRANT_TYPE, OAuthConstants.AUTHORIZATION_CODE)
-          request.addParameter(OAuthConstants.CODE, code)
-          request.addParameter(RESPONSE_TYPE, "token")
-          request.addParameter(OAuthConstants.REDIRECT_URI, config.getCallback)
-
-          // Work around issue https://github.com/scribejava/scribejava/issues/641
-          request.addHeader("Content-Type", "application/x-www-form-urlencoded")
-
-          // CloudFoundry requires a Authorization header encoded with client Id and secret.
-          val authorizationHeader = "Basic " + base64(config.getApiKey + ":" + config.getApiSecret)
-          request.addHeader("Authorization", authorizationHeader)
-          request
-        }
-      }
-    }
-  }
-
-  /**
-   * Additional authenticator to check more credential attributes of user before logging in.
-   * This authenticator is applied AFTER user pass the initial (default) authenticator.
-   */
-  trait AdditionalAuthenticator {
-
-    /**
-     * Initialization
-     *
-     * @param config Configurations specifically used for this authenticator.
-     * @param executionContext Execution Context to use to run futures.
-     */
-    def init(config: Config, executionContext: ExecutionContext): Unit
-
-    /**
-     *
-     * @param accessToken, the accessToken for the UAA
-     * @param user user session returned by previous authenticator
-     * @return an updated UserSession
-     */
-    def authenticate(
-        asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, user: UserSession)
-      : Future[UserSession]
-  }
-
-  val ORGANIZATION_URL = "organization-url"
-
-  class OrganizationAccessChecker extends AdditionalAuthenticator {
-    private var organizationUrl: String = null
-    private implicit var executionContext: ExecutionContext = null
-
-    override def init(config: Config, executionContext: ExecutionContext): Unit = {
-      this.organizationUrl = config.getString(ORGANIZATION_URL)
-      this.executionContext = executionContext
-    }
-
-    override def authenticate(asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken,
-        user: UserSession): Future[UserSession] = {
-
-      val promise = Promise[UserSession]()
-      val builder = asyncClient.prepareGet(organizationUrl)
-      builder.addHeader("Authorization", s"bearer ${accessToken.getAccessToken}")
-      builder.execute(new AsyncCompletionHandler[Unit] {
-        override def onCompleted(response: client.Response): Unit = {
-          if (response.getStatusCode == 200) {
-            promise.success(user)
-          } else {
-            promise.failure(new Exception(response.getResponseBody))
-          }
-        }
-      })
-      promise.future
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
deleted file mode 100644
index c2b18fd..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.security.oauth2.impl
-
-import com.github.scribejava.apis.google.GoogleJsonTokenExtractor
-import com.github.scribejava.core.builder.api.DefaultApi20
-import com.github.scribejava.core.extractors.TokenExtractor
-import com.github.scribejava.core.model._
-import spray.json._
-
-/**
- *
- * Does authentication with Google OAuth2 service. It only extract the email address
- * from user profile of Google.
- *
- * Pre-requisite steps to use this Authenticator:
- *
- * Step1: Register your website as an OAuth2 Application on Google
- *  1) Create an application representing your website at [[https://console.developers.google.com]]
- *
- *  2) In "API Manager" of your created application, enable API "Google+ API"
- *
- *  3) Create OAuth client ID for this application. In "Credentials" tab of "API Manager",
- * choose "Create credentials", and then select OAuth client ID. Follow the wizard
- * to set callback URL, and generate client ID, and client Secret. Callback URL is NOT optional.
- *
- * Step2: Configure the OAuth2 information in gear.conf
- *
- *  1) Enable OAuth2 authentication by setting "gearpump.ui-security.oauth2-authenticator-enabled"
- * as true.
- *
- *  2) Configure section "gearpump.ui-security.oauth2-authenticators.google". Please make sure
- * class name, client ID, client Secret, and callback URL are set properly.
- *
- * NOTE:  callback URL set here should match what is configured on Google in step1.
- *
- * Step3: Restart the UI service and try out the Google social login button in UI.
- *
- * NOTE:  OAuth requires Internet access, @see
- *       [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] to find some helpful tutorials
- *
- * NOTE:  Google use scope to define what data can be fetched by OAuth2. Currently we use profile
- *       [[https://www.googleapis.com/auth/userinfo.email]]. However, Google may change the profile
- *       in future.
- *
- * TODO Currently, this doesn't verify the state from Google OAuth2 response.
- *
- * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more API information.
- */
-class GoogleOAuth2Authenticator extends BaseOAuth2Authenticator {
-
-  import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator._
-
-  protected override def authorizeUrl: String = AuthorizeUrl
-
-  protected override def accessTokenEndpoint: String = AccessEndpoint
-
-  protected override def protectedResourceUrl: String = ResourceUrl
-
-  protected override def scope: String = GoogleOAuth2Authenticator.Scope
-
-  protected override def extractUserName(body: String): String = {
-    val emails = body.parseJson.asJsObject.fields("emails").asInstanceOf[JsArray]
-    val email = emails.elements(0).asJsObject("Cannot find email account")
-      .fields("value").asInstanceOf[JsString].value
-    email
-  }
-
-  override def oauth2Api(): DefaultApi20 = new AsyncGoogleApi20(authorizeUrl, accessTokenEndpoint)
-}
-
-object GoogleOAuth2Authenticator {
-
-  import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator._
-
-  // scalastyle:off line.size.limit
-  val AuthorizeUrl = "https://accounts.google.com/o/oauth2/auth?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s"
-  // scalastyle:on line.size.limit
-  val AccessEndpoint = "https://www.googleapis.com/oauth2/v4/token"
-  val ResourceUrl = "https://www.googleapis.com/plus/v1/people/me"
-  val Scope = "https://www.googleapis.com/auth/userinfo.email"
-
-  private class AsyncGoogleApi20(authorizeUrl: String, accessEndpoint: String)
-    extends BaseApi20(authorizeUrl, accessEndpoint) {
-
-    override def getAccessTokenExtractor: TokenExtractor[OAuth2AccessToken] = {
-      GoogleJsonTokenExtractor.instance
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
deleted file mode 100644
index f95474e..0000000
--- a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.services.util
-
-import upickle.Js
-
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.util.Graph
-
-object UpickleUtil {
-
-  // For implicit type, we need to add EXPLICIT return type, otherwise, upickle may NOT infer the
-  // reader type automatically.
-  // See issue https://github.com/lihaoyi/upickle-pprint/issues/102
-  implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = {
-    upickle.default.Reader[Graph[Int, String]] {
-      case Js.Obj(verties, edges) =>
-        val vertexList = upickle.default.readJs[List[Int]](verties._2)
-        val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
-        Graph(vertexList, edgeList)
-    }
-  }
-
-  implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] {
-    case Js.Str(str) =>
-      WorkerId.parse(str)
-  }
-
-  implicit val workerIdWriter: upickle.default.Writer[WorkerId] = upickle.default.Writer[WorkerId] {
-    case workerId: WorkerId =>
-      Js.Str(WorkerId.render(workerId))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala
new file mode 100644
index 0000000..4d1ba22
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AdminService.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server.Directives._
+import akka.stream.Materializer
+
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+/**
+ * AdminService is for cluster-wide managements. it is not related with
+ * specific application.
+ *
+ * For example:
+ *  - Security management: Add user, remove user.
+ *  - Configuration management: Change configurations.
+ *  - Machine management: Add worker machines, remove worker machines, and add masters.
+ */
+
+// TODO: Add YARN resource manager capacities to add/remove machines.
+class AdminService(override val system: ActorSystem)
+  extends BasicService {
+
+  protected override def prefix = Neutral
+
+  protected override def doRoute(implicit mat: Materializer) = {
+    path("terminate") {
+      post {
+        system.terminate()
+        complete(StatusCodes.NotFound)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
new file mode 100644
index 0000000..1ca2306
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/AppMasterService.scala
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.http.scaladsl.model.{FormData, Multipart}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
+import akka.stream.Materializer
+import upickle.default.{read, write}
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppMasterSummary, GeneralAppMasterSummary}
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.services.AppMasterService.Status
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
+import org.apache.gearpump.streaming.appmaster.DagManager._
+import org.apache.gearpump.streaming.appmaster.StreamAppMasterSummary
+import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
+import org.apache.gearpump.util.ActorUtil.{askActor, askAppMaster}
+import org.apache.gearpump.util.FileDirective._
+import org.apache.gearpump.util.{Constants, Util}
+
+/**
+ * Management service for AppMaster
+ */
+class AppMasterService(val master: ActorRef,
+    val jarStore: JarStoreService, override val system: ActorSystem)
+  extends BasicService {
+
+  private val systemConfig = system.settings.config
+  private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
+
+  protected override def doRoute(implicit mat: Materializer) = pathPrefix("appmaster" / IntNumber) {
+    appId => {
+      path("dynamicdag") {
+        parameters(ParamMagnet("args")) { args: String =>
+          def replaceProcessor(dagOperation: DAGOperation): Route = {
+            onComplete(askAppMaster[DAGOperationResult](master, appId, dagOperation)) {
+              case Success(value) =>
+                complete(write(value))
+              case Failure(ex) =>
+                failWith(ex)
+            }
+          }
+
+          val msg = java.net.URLDecoder.decode(args, "UTF-8")
+          val dagOperation = read[DAGOperation](msg)
+          (post & entity(as[Multipart.FormData])) { _ =>
+          uploadFile { form =>
+            val jar = form.getFile("jar").map(_.file)
+
+            if (jar.nonEmpty) {
+              dagOperation match {
+                case replace: ReplaceProcessor =>
+                  val description = replace.newProcessorDescription.copy(jar =
+                    Util.uploadJar(jar.get, jarStore))
+                  val dagOperationWithJar = replace.copy(newProcessorDescription = description)
+                  replaceProcessor(dagOperationWithJar)
+              }
+            } else {
+              replaceProcessor(dagOperation)
+            }
+          }
+        } ~ (post & entity(as[FormData])) { _ =>
+          replaceProcessor(dagOperation)
+        }
+        }
+      } ~
+      path("stallingtasks") {
+        onComplete(askAppMaster[StallingTasks](master, appId, GetStallingTasks(appId))) {
+          case Success(value) =>
+            complete(write(value))
+          case Failure(ex) => failWith(ex)
+        }
+      } ~
+      path("errors") {
+        onComplete(askAppMaster[LastFailure](master, appId, GetLastFailure(appId))) {
+          case Success(value) =>
+            complete(write(value))
+          case Failure(ex) => failWith(ex)
+        }
+      } ~
+      path("restart") {
+        post {
+          onComplete(askActor[SubmitApplicationResult](master, RestartApplication(appId))) {
+            case Success(_) =>
+              complete(write(Status(true)))
+            case Failure(ex) =>
+              complete(write(Status(false, ex.getMessage)))
+          }
+        }
+      } ~
+      path("config") {
+        onComplete(askActor[AppMasterConfig](master, QueryAppMasterConfig(appId))) {
+          case Success(value: AppMasterConfig) =>
+            val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}")
+            complete(config)
+          case Failure(ex) =>
+            failWith(ex)
+        }
+      } ~
+      pathPrefix("executor" / Segment) { executorIdString =>
+        path("config") {
+          val executorId = Integer.parseInt(executorIdString)
+          onComplete(askAppMaster[ExecutorConfig](master, appId, QueryExecutorConfig(executorId))) {
+            case Success(value) =>
+              val config = Option(value.config).map(ClusterConfig.render(_, concise))
+                .getOrElse("{}")
+              complete(config)
+            case Failure(ex) =>
+              failWith(ex)
+          }
+        } ~
+          pathEnd {
+            get {
+              val executorId = Integer.parseInt(executorIdString)
+              onComplete(askAppMaster[ExecutorSummary](master, appId,
+            GetExecutorSummary(executorId))) {
+              case Success(value) =>
+                complete(write(value))
+              case Failure(ex) =>
+                failWith(ex)
+              }
+            }
+          }
+      } ~
+      path("metrics" / RestPath) { path =>
+        parameterMap { optionMap =>
+          parameter("aggregator" ? "") { aggregator =>
+            parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption =>
+              val query = QueryHistoryMetrics(path.head.toString, readOption, aggregator, optionMap)
+              onComplete(askAppMaster[HistoryMetrics](master, appId, query)) {
+                case Success(value) =>
+                  complete(write(value))
+                case Failure(ex) =>
+                  failWith(ex)
+              }
+            }
+          }
+        }
+      } ~
+      pathEnd {
+        get {
+          parameter("detail" ? "false") { detail =>
+            val queryDetails = Try(detail.toBoolean).getOrElse(false)
+            val request = AppMasterDataDetailRequest(appId)
+            queryDetails match {
+              case true =>
+                onComplete(askAppMaster[AppMasterSummary](master, appId, request)) {
+                  case Success(value) =>
+                    value match {
+                      case data: GeneralAppMasterSummary =>
+                        complete(write(data))
+                      case data: StreamAppMasterSummary =>
+                        complete(write(data))
+                    }
+                  case Failure(ex) =>
+                    failWith(ex)
+                }
+
+              case false =>
+                onComplete(askActor[AppMasterData](master, AppMasterDataRequest(appId))) {
+                  case Success(value) =>
+                    complete(write(value))
+                  case Failure(ex) =>
+                    failWith(ex)
+                }
+            }
+          }
+        }
+      } ~
+      pathEnd {
+        delete {
+          val writer = (result: ShutdownApplicationResult) => {
+            val output = if (result.appId.isSuccess) {
+              Map("status" -> "success", "info" -> null)
+            } else {
+              Map("status" -> "fail", "info" -> result.appId.failed.get.toString)
+            }
+            write(output)
+          }
+          onComplete(askActor[ShutdownApplicationResult](master, ShutdownApplication(appId))) {
+            case Success(result) =>
+              val output = if (result.appId.isSuccess) {
+                Map("status" -> "success", "info" -> null)
+              } else {
+                Map("status" -> "fail", "info" -> result.appId.failed.get.toString)
+              }
+              complete(write(output))
+            case Failure(ex) =>
+              failWith(ex)
+          }
+        }
+      }
+    }
+  }
+}
+
+object AppMasterService {
+  case class Status(success: Boolean, reason: String = null)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala
new file mode 100644
index 0000000..3846c33
--- /dev/null
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/BasicService.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.services
+
+import scala.concurrent.ExecutionContext
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.headers.CacheDirectives.{`max-age`, `no-cache`}
+import akka.http.scaladsl.model.headers.`Cache-Control`
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.Route
+import akka.stream.Materializer
+
+import org.apache.gearpump.util.{Constants, LogUtil}
+// NOTE: This cannot be removed!!!
+import org.apache.gearpump.services.util.UpickleUtil._
+
+trait RouteService {
+  def route: Route
+}
+
+/**
+ * Wraps the cache behavior, and some common utils.
+ */
+trait BasicService extends RouteService {
+
+  implicit def system: ActorSystem
+
+  implicit def timeout: akka.util.Timeout = Constants.FUTURE_TIMEOUT
+
+  implicit def ec: ExecutionContext = system.dispatcher
+
+  protected def doRoute(implicit mat: Materializer): Route
+
+  protected def prefix = Slash ~ "api" / s"$REST_VERSION"
+
+  protected val LOG = LogUtil.getLogger(getClass)
+
+  protected def cache = false
+  private val noCacheHeader = `Cache-Control`(`no-cache`, `max-age`(0L))
+
+  def route: Route = encodeResponse {
+    extractMaterializer { implicit mat =>
+      rawPathPrefix(prefix) {
+        if (cache) {
+          doRoute(mat)
+        } else {
+          respondWithHeader(noCacheHeader) {
+            doRoute(mat)
+          }
+        }
+      }
+    }
+  }
+}