You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:54 UTC

[44/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
new file mode 100644
index 0000000..28a4907
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster
+
+import akka.actor.{ActorSystem, ExtendedActorSystem}
+import akka.serialization.JavaSerializer
+
+import io.gearpump.google.common.io.BaseEncoding
+
+/**
+ * Immutable configuration
+ */
+final class UserConfig(private val _config: Map[String, String]) extends Serializable {
+
+  def withBoolean(key: String, value: Boolean): UserConfig = {
+    new UserConfig(_config + (key -> value.toString))
+  }
+
+  def withDouble(key: String, value: Double): UserConfig = {
+    new UserConfig(_config + (key -> value.toString))
+  }
+
+  def withFloat(key: String, value: Float): UserConfig = {
+    new UserConfig(_config + (key -> value.toString))
+  }
+
+  def withInt(key: String, value: Int): UserConfig = {
+    new UserConfig(_config + (key -> value.toString))
+  }
+
+  def withLong(key: String, value: Long): UserConfig = {
+    new UserConfig(_config + (key -> value.toString))
+  }
+
+  def withString(key: String, value: String): UserConfig = {
+    if (null == value) {
+      this
+    } else {
+      new UserConfig(_config + (key -> value))
+    }
+  }
+
+  def without(key: String): UserConfig = {
+    val config = _config - key
+    new UserConfig(config)
+  }
+
+  def filter(p: ((String, String)) => Boolean): UserConfig = {
+    val updated = _config.filter(p)
+    new UserConfig(updated)
+  }
+
+  def getBoolean(key: String): Option[Boolean] = {
+    _config.get(key).map(_.toBoolean)
+  }
+
+  def getDouble(key: String): Option[Double] = {
+    _config.get(key).map(_.toDouble)
+  }
+
+  def getFloat(key: String): Option[Float] = {
+    _config.get(key).map(_.toFloat)
+  }
+
+  def getInt(key: String): Option[Int] = {
+    _config.get(key).map(_.toInt)
+  }
+
+  def getLong(key: String): Option[Long] = {
+    _config.get(key).map(_.toLong)
+  }
+
+  def getString(key: String): Option[String] = {
+    _config.get(key)
+  }
+
+  def getBytes(key: String): Option[Array[Byte]] = {
+    _config.get(key).map(BaseEncoding.base64().decode(_))
+  }
+
+  def withBytes(key: String, value: Array[Byte]): UserConfig = {
+    if (null == value) {
+      this
+    } else {
+      this.withString(key, BaseEncoding.base64().encode(value))
+    }
+  }
+
+  // scalastyle:off line.size.limit
+  /**
+   * This de-serializes value to object instance
+   *
+   * To do de-serialization, this requires an implicit ActorSystem, as
+   * the ActorRef and possibly other akka classes deserialization
+   * requires an implicit ActorSystem.
+   *
+   * See Link:
+   * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization
+   */
+
+  def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = {
+    val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
+    _config.get(key).map(BaseEncoding.base64().decode(_))
+      .map(serializer.fromBinary(_).asInstanceOf[T])
+  }
+
+  /**
+   * This serializes the object and store it as string.
+   *
+   * To do serialization, this requires an implicit ActorSystem, as
+   * the ActorRef and possibly other akka classes serialization
+   * requires an implicit ActorSystem.
+   *
+   * See Link:
+   * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization
+   */
+  def withValue[T <: AnyRef](key: String, value: T)(implicit system: ActorSystem): UserConfig = {
+
+    if (null == value) {
+      this
+    } else {
+      val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
+      val bytes = serializer.toBinary(value)
+      val encoded = BaseEncoding.base64().encode(bytes)
+      this.withString(key, encoded)
+    }
+  }
+  // scalastyle:on line.size.limit
+
+  def withConfig(other: UserConfig): UserConfig = {
+    if (null == other) {
+      this
+    } else {
+      new UserConfig(_config ++ other._config)
+    }
+  }
+}
+
+object UserConfig {
+
+  def empty: UserConfig = new UserConfig(Map.empty[String, String])
+
+  def apply(config: Map[String, String]): UserConfig = new UserConfig(config)
+
+  def unapply(config: UserConfig): Option[Map[String, String]] = Option(config._config)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
new file mode 100644
index 0000000..170e56a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor._
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
+import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{Session, StartExecutorSystems}
+import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.{AppDescription, AppMasterContext}
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * This serves as runtime environment for AppMaster.
+ * When starting an AppMaster, we need to setup the connection to master,
+ * and prepare other environments.
+ *
+ * This also extend the function of Master, by providing a scheduler service for Executor System.
+ * AppMaster can ask Master for executor system directly. details like requesting resource,
+ * contacting worker to start a process, and then starting an executor system is hidden from
+ * AppMaster.
+ *
+ * Please use AppMasterRuntimeEnvironment.props() to construct this actor.
+ */
+private[appmaster]
+class AppMasterRuntimeEnvironment(
+    appContextInput: AppMasterContext,
+    app: AppDescription,
+    masters: Iterable[ActorPath],
+    masterFactory: (AppId, MasterActorRef) => Props,
+    appMasterFactory: (AppMasterContext, AppDescription) => Props,
+    masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, ListenerActorRef) => Props)
+  extends Actor {
+
+  val appId = appContextInput.appId
+  private val LOG = LogUtil.getLogger(getClass, app = appId)
+
+  import scala.concurrent.duration._
+
+  private val master = context.actorOf(
+    masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30.seconds)))))
+  private val appContext = appContextInput.copy(masterProxy = master)
+
+  // Create appMaster proxy to receive command and forward to appmaster
+  private val appMaster = context.actorOf(appMasterFactory(appContext, app))
+  context.watch(appMaster)
+
+  private val registerAppMaster = RegisterAppMaster(appMaster, appContext.registerData)
+  private val masterConnectionKeeper = context.actorOf(
+    masterConnectionKeeperFactory(master, registerAppMaster, self))
+  context.watch(masterConnectionKeeper)
+
+  def receive: Receive = {
+    case MasterConnected =>
+      LOG.info(s"Master is connected, start AppMaster ${appId}...")
+      appMaster ! StartAppMaster
+    case MasterStopped =>
+      LOG.error(s"Master is stopped, stop AppMaster ${appId}...")
+      context.stop(self)
+    case Terminated(actor) => actor match {
+      case `appMaster` =>
+        LOG.error(s"AppMaster ${appId} is stopped, shutdown myself")
+        context.stop(self)
+      case `masterConnectionKeeper` =>
+        LOG.error(s"Master connection keeper is stopped, appId: ${appId}, shutdown myself")
+        context.stop(self)
+      case _ => // Skip
+    }
+  }
+}
+
+object AppMasterRuntimeEnvironment {
+
+  def props(
+      masters: Iterable[ActorPath], app: AppDescription, appContextInput: AppMasterContext)
+    : Props = {
+
+    val master = (appId: AppId, masterProxy: MasterActorRef) =>
+      MasterWithExecutorSystemProvider.props(appId, masterProxy)
+
+    val appMaster = (appContext: AppMasterContext, app: AppDescription) =>
+      LazyStartAppMaster.props(appContext, app)
+
+    val masterConnectionKeeper = (master: MasterActorRef, registerAppMaster:
+      RegisterAppMaster, listener: ListenerActorRef) => Props(new MasterConnectionKeeper(
+        registerAppMaster, master, masterStatusListener = listener))
+
+    Props(new AppMasterRuntimeEnvironment(
+      appContextInput, app, masters, master, appMaster, masterConnectionKeeper))
+  }
+
+  /**
+   * This behavior like a AppMaster. Under the hood, It start start the real AppMaster in a lazy
+   * way. When real AppMaster is not started yet, all messages are stashed. The stashed
+   * messages are forwarded to real AppMaster when the real AppMaster is started.
+   *
+   * Please use LazyStartAppMaster.props to construct this actor
+   *
+   * @param appMasterProps  underlying AppMaster Props
+   */
+  private[appmaster]
+  class LazyStartAppMaster(appId: Int, appMasterProps: Props) extends Actor with Stash {
+
+    private val LOG = LogUtil.getLogger(getClass, app = appId)
+
+    def receive: Receive = null
+
+    context.become(startAppMaster)
+
+    def startAppMaster: Receive = {
+      case StartAppMaster =>
+        val appMaster = context.actorOf(appMasterProps, "appmaster")
+        context.watch(appMaster)
+        context.become(terminationWatch(appMaster) orElse appMasterService(appMaster))
+        unstashAll()
+      case _ =>
+        stash()
+    }
+
+    def terminationWatch(appMaster: ActorRef): Receive = {
+      case Terminated(appMaster) =>
+        LOG.error("appmaster is stopped")
+        context.stop(self)
+    }
+
+    def appMasterService(appMaster: ActorRef): Receive = {
+      case msg => appMaster forward msg
+    }
+  }
+
+  private[appmaster]
+  object LazyStartAppMaster {
+    def props(appContext: AppMasterContext, app: AppDescription): Props = {
+      val appMasterProps = Props(Class.forName(app.appMaster), appContext, app)
+      Props(new LazyStartAppMaster(appContext.appId, appMasterProps))
+    }
+  }
+
+  private[appmaster] case object StartAppMaster
+
+  /**
+   * This enhance Master by providing new service: StartExecutorSystems
+   *
+   * Please use MasterWithExecutorSystemProvider.props to construct this actor
+   *
+   */
+  private[appmaster]
+  class MasterWithExecutorSystemProvider(master: ActorRef, executorSystemProviderProps: Props)
+    extends Actor {
+
+    val executorSystemProvider: ActorRef = context.actorOf(executorSystemProviderProps)
+
+    override def receive: Receive = {
+      case request: StartExecutorSystems =>
+        executorSystemProvider forward request
+      case msg =>
+        master forward msg
+    }
+  }
+
+  private[appmaster]
+  object MasterWithExecutorSystemProvider {
+    def props(appId: Int, master: ActorRef): Props = {
+
+      val executorSystemLauncher = (appId: Int, session: Session) =>
+        Props(new ExecutorSystemLauncher(appId, session))
+
+      val scheduler = Props(new ExecutorSystemScheduler(appId, master, executorSystemLauncher))
+
+      Props(new MasterWithExecutorSystemProvider(master, scheduler))
+    }
+  }
+
+  private[appmaster] type AppId = Int
+  private[appmaster] type MasterActorRef = ActorRef
+  private[appmaster] type ListenerActorRef = ActorRef
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
new file mode 100644
index 0000000..b3ec88c
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.ActorRef
+import com.typesafe.config.Config
+
+import org.apache.gearpump._
+import org.apache.gearpump.cluster.AppMasterRegisterData
+
+/** Run time info used to start an AppMaster */
+case class AppMasterRuntimeInfo(
+    appId: Int,
+    // AppName is the unique Id for an application
+    appName: String,
+    worker: ActorRef = null,
+    user: String = null,
+    submissionTime: TimeStamp = 0,
+    startTime: TimeStamp = 0,
+    finishTime: TimeStamp = 0,
+    config: Config = null)
+  extends AppMasterRegisterData

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala
new file mode 100644
index 0000000..7240113
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import org.apache.gearpump.cluster.{AppDescription, AppJar}
+
+/**
+ * This state for single application, it is be distributed across the masters.
+ */
+case class ApplicationState(
+    appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: Option[AppJar],
+    username: String, state: Any) extends Serializable {
+
+  override def equals(other: Any): Boolean = {
+    other match {
+      case that: ApplicationState =>
+        if (appId == that.appId && attemptId == that.attemptId) {
+          true
+        } else {
+          false
+        }
+      case _ =>
+        false
+    }
+  }
+
+  override def hashCode: Int = {
+    import akka.routing.MurmurHash._
+    extendHash(appId, attemptId, startMagicA, startMagicB)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala
new file mode 100644
index 0000000..5c44652
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.{ActorRef, Address, PoisonPill}
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.util.ActorSystemBooter.BindLifeCycle
+
+case class WorkerInfo(workerId: WorkerId, ref: ActorRef)
+
+/**
+ * Configurations to start an executor system on remote machine
+ *
+ * @param address Remote address where we start an Actor System.
+ */
+case class ExecutorSystem(executorSystemId: Int, address: Address, daemon:
+    ActorRef, resource: Resource, worker: WorkerInfo) {
+  def bindLifeCycleWith(actor: ActorRef): Unit = {
+    daemon ! BindLifeCycle(actor)
+  }
+
+  def shutdown(): Unit = {
+    daemon ! PoisonPill
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
new file mode 100644
index 0000000..0c5dbff
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import scala.concurrent.duration._
+
+import akka.actor._
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
+import org.apache.gearpump.cluster.ExecutorJVMConfig
+import org.apache.gearpump.cluster.WorkerToAppMaster._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, Session}
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem}
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants, LogUtil}
+
+/**
+ * This launches single executor system on target worker.
+ *
+ * Please use ExecutorSystemLauncher.props() to construct this actor
+ *
+ * @param session The session that request to launch executor system
+ */
+private[appmaster]
+class ExecutorSystemLauncher(appId: Int, session: Session) extends Actor {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  val scheduler = context.system.scheduler
+  implicit val executionContext = context.dispatcher
+
+  private val systemConfig = context.system.settings.config
+  val timeoutSetting = systemConfig.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS)
+
+  val timeout = scheduler.scheduleOnce(timeoutSetting.milliseconds,
+    self, LaunchExecutorSystemTimeout(session))
+
+  def receive: Receive = waitForLaunchCommand
+
+  def waitForLaunchCommand: Receive = {
+    case LaunchExecutorSystem(worker, executorSystemId, resource) =>
+      val launcherPath = ActorUtil.getFullPath(context.system, self.path)
+      val jvmConfig = Option(session.executorSystemJvmConfig)
+        .map(getExecutorJvmConfig(_, s"app${appId}system${executorSystemId}", launcherPath)).orNull
+
+      val launch = LaunchExecutor(appId, executorSystemId, resource, jvmConfig)
+      LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, " +
+        s"slots: ${resource.slots} on worker $worker")
+
+      worker.ref ! launch
+      context.become(waitForActorSystemToStart(sender, launch, worker, executorSystemId))
+  }
+
+  def waitForActorSystemToStart(
+      replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int)
+    : Receive = {
+    case RegisterActorSystem(systemPath) =>
+      import launch._
+      timeout.cancel()
+      LOG.info(s"Received RegisterActorSystem $systemPath for session ${session.requestor}")
+      sender ! ActorSystemRegistered(worker.ref)
+      val system =
+        ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker)
+      replyTo ! LaunchExecutorSystemSuccess(system, session)
+      context.stop(self)
+    case reject@ExecutorLaunchRejected(reason, ex) =>
+      LOG.error(s"Executor Launch ${launch.resource} failed reason: $reason", ex)
+      replyTo ! LaunchExecutorSystemRejected(launch.resource, reason, session)
+      context.stop(self)
+    case timeout: LaunchExecutorSystemTimeout =>
+      LOG.error(s"The Executor ActorSystem $executorSystemId has not been started in time")
+      replyTo ! timeout
+      context.stop(self)
+  }
+}
+
+private[appmaster]
+object ExecutorSystemLauncher {
+
+  case class LaunchExecutorSystem(worker: WorkerInfo, systemId: Int, resource: Resource)
+
+  case class LaunchExecutorSystemSuccess(system: ExecutorSystem, session: Session)
+
+  case class LaunchExecutorSystemRejected(resource: Resource, reason: Any, session: Session)
+
+  case class LaunchExecutorSystemTimeout(session: Session)
+
+  private def getExecutorJvmConfig(conf: ExecutorSystemJvmConfig, systemName: String,
+      reportBack: String): ExecutorJVMConfig = {
+    Option(conf).map { conf =>
+      import conf._
+      ExecutorJVMConfig(classPath, jvmArguments, classOf[ActorSystemBooter].getName,
+        Array(systemName, reportBack), jar, username, executorAkkaConfig)
+    }.getOrElse(null)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
new file mode 100644
index 0000000..d73cc2f
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.concurrent.duration._
+
+import akka.actor._
+import com.typesafe.config.Config
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._
+import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler._
+import org.apache.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+/**
+ * ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster.
+ * AppMaster can use this class to directly request a live executor actor systems. The communication
+ * in the background with Master and Worker is hidden from AppMaster.
+ *
+ * Please use ExecutorSystemScheduler.props() to construct this actor
+ */
+private[appmaster]
+class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef,
+    executorSystemLauncher: (Int, Session) => Props) extends Actor {
+
+  private val LOG = LogUtil.getLogger(getClass, app = appId)
+  implicit val timeout = Constants.FUTURE_TIMEOUT
+  implicit val actorSystem = context.system
+  var currentSystemId = 0
+
+  var resourceAgents = Map.empty[Session, ActorRef]
+
+  def receive: Receive = {
+    clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler
+  }
+
+  def clientCommands: Receive = {
+    case start: StartExecutorSystems =>
+      LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " +
+        s"Resources(${start.resources.mkString(",")}))")
+      val requestor = sender()
+      val executorSystemConfig = start.executorSystemConfig
+      val session = Session(requestor, executorSystemConfig)
+      val agent = resourceAgents.getOrElse(session,
+        context.actorOf(Props(new ResourceAgent(masterProxy, session))))
+      resourceAgents = resourceAgents + (session -> agent)
+
+      start.resources.foreach { resource =>
+        agent ! RequestResource(appId, resource)
+      }
+
+    case StopExecutorSystem(executorSystem) =>
+      executorSystem.shutdown
+  }
+
+  def resourceAllocationMessageHandler: Receive = {
+    case ResourceAllocatedForSession(allocations, session) =>
+      if (isSessionAlive(session)) {
+        allocations.foreach { resourceAllocation =>
+          val ResourceAllocation(resource, worker, workerId) = resourceAllocation
+
+          val launcher = context.actorOf(executorSystemLauncher(appId, session))
+          launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource)
+          currentSystemId = currentSystemId + 1
+        }
+      }
+    case ResourceAllocationTimeOut(session) =>
+      if (isSessionAlive(session)) {
+        resourceAgents = resourceAgents - session
+        session.requestor ! StartExecutorSystemTimeout
+      }
+  }
+
+  def executorSystemMessageHandler: Receive = {
+    case LaunchExecutorSystemSuccess(system, session) =>
+      if (isSessionAlive(session)) {
+        LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor)
+        system.bindLifeCycleWith(self)
+        session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar)
+      } else {
+        LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " +
+          "Will shutdown the allocated system")
+        system.shutdown
+      }
+    case LaunchExecutorSystemTimeout(session) =>
+      if (isSessionAlive(session)) {
+        LOG.error(s"Failed to launch executor system for ${session.requestor} due to timeout")
+        session.requestor ! StartExecutorSystemTimeout
+      }
+
+    case LaunchExecutorSystemRejected(resource, reason, session) =>
+      if (isSessionAlive(session)) {
+        LOG.error(s"Failed to launch executor system, due to $reason, " +
+          s"will ask master to allocate new resources $resource")
+        resourceAgents.get(session).map { resourceAgent: ActorRef =>
+          resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
+        }
+      }
+  }
+
+  private def isSessionAlive(session: Session): Boolean = {
+    Option(session).flatMap(session => resourceAgents.get(session)).nonEmpty
+  }
+}
+
+object ExecutorSystemScheduler {
+
+  case class StartExecutorSystems(
+      resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig)
+
+  case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar])
+
+  case class StopExecutorSystem(system: ExecutorSystem)
+
+  case object StartExecutorSystemTimeout
+
+  case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String],
+      jar: Option[AppJar], username: String, executorAkkaConfig: Config = null)
+
+  /**
+   * For each client which ask for an executor system, the scheduler will create a session for it.
+   *
+   */
+  private[appmaster]
+  case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig)
+
+  /**
+   * This is a agent for session to request resource
+   *
+   * @param session the original requester of the resource requests
+   */
+  private[appmaster]
+  class ResourceAgent(master: ActorRef, session: Session) extends Actor {
+    private var resourceRequestor: ActorRef = null
+    var timeOutClock: Cancellable = null
+    private var unallocatedResource: Int = 0
+
+    import context.dispatcher
+
+    import org.apache.gearpump.util.Constants._
+
+    val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT)
+
+    def receive: Receive = {
+      case request: RequestResource =>
+        unallocatedResource += request.request.resource.slots
+        Option(timeOutClock).map(_.cancel)
+        timeOutClock = context.system.scheduler.scheduleOnce(
+          timeout.seconds, self, ResourceAllocationTimeOut(session))
+        resourceRequestor = sender
+        master ! request
+      case ResourceAllocated(allocations) =>
+        unallocatedResource -= allocations.map(_.resource.slots).sum
+        resourceRequestor forward ResourceAllocatedForSession(allocations, session)
+      case timeout: ResourceAllocationTimeOut =>
+        if (unallocatedResource > 0) {
+          resourceRequestor ! ResourceAllocationTimeOut(session)
+          // We will not receive any ResourceAllocation after timeout
+          context.stop(self)
+        }
+    }
+  }
+
+  private[ExecutorSystemScheduler]
+  case class ResourceAllocatedForSession(resource: Array[ResourceAllocation], session: Session)
+
+  private[ExecutorSystemScheduler]
+  case class ResourceAllocationTimeOut(session: Session)
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
new file mode 100644
index 0000000..fd2d11a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor._
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
+import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterRegistered
+import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.AppMasterRegisterTimeout
+import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped}
+import org.apache.gearpump.cluster.master.MasterProxy.{MasterRestarted, WatchMaster}
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Watches the liveness of Master.
+ *
+ * When Master is restarted, it sends RegisterAppMaster to the new Master instance.
+ * If Master is stopped, it sends the MasterConnectionStatus to listener
+ *
+ * please use MasterConnectionKeeper.props() to construct this actor
+ */
+private[appmaster]
+class MasterConnectionKeeper(
+    register: RegisterAppMaster, masterProxy: ActorRef, masterStatusListener: ActorRef)
+  extends Actor {
+
+  import context.dispatcher
+
+  private val LOG = LogUtil.getLogger(getClass)
+  private var master: ActorRef = null
+
+  // Subscribe self to masterProxy,
+  masterProxy ! WatchMaster(self)
+
+  def registerAppMaster: Cancellable = {
+    masterProxy ! register
+    context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS),
+      self, AppMasterRegisterTimeout)
+  }
+
+  context.become(waitMasterToConfirm(registerAppMaster))
+
+  def waitMasterToConfirm(cancelRegister: Cancellable): Receive = {
+    case AppMasterRegistered(appId) =>
+      cancelRegister.cancel()
+      masterStatusListener ! MasterConnected
+      context.become(masterLivenessListener)
+    case AppMasterRegisterTimeout =>
+      cancelRegister.cancel()
+      masterStatusListener ! MasterStopped
+      context.stop(self)
+  }
+
+  def masterLivenessListener: Receive = {
+    case MasterRestarted =>
+      LOG.info("Master restarted, re-registering appmaster....")
+      context.become(waitMasterToConfirm(registerAppMaster))
+    case MasterStopped =>
+      LOG.info("Master is dead, killing this AppMaster....")
+      masterStatusListener ! MasterStopped
+      context.stop(self)
+  }
+
+  def receive: Receive = null
+}
+
+private[appmaster] object MasterConnectionKeeper {
+
+  case object AppMasterRegisterTimeout
+
+  object MasterConnectionStatus {
+
+    case object MasterConnected
+
+    case object MasterStopped
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
new file mode 100644
index 0000000..245f1bc
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.Try
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.util.Timeout
+import com.typesafe.config.{Config, ConfigValueFactory}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge}
+import org.apache.gearpump.cluster.MasterToClient.ReplayApplicationResult
+import org.apache.gearpump.cluster._
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
+
+/**
+ * ClientContext is a user facing util to submit/manage an application.
+ *
+ * TODO: add interface to query master here
+ */
+class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
+
+  def this(system: ActorSystem) = {
+    this(system.settings.config, system, null)
+  }
+
+  def this(config: Config) = {
+    this(config, null, null)
+  }
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
+
+  implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config))
+  LOG.info(s"Starting system ${system.name}")
+  val shouldCleanupSystem = Option(sys).isEmpty
+
+  private val jarStoreService = JarStoreService.get(config)
+  jarStoreService.init(config, system)
+
+  private lazy val master: ActorRef = {
+    val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+      .flatMap(Util.parseHostList)
+    val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters),
+      s"masterproxy${system.name}"))
+    LOG.info(s"Creating master proxy ${master} for master list: $masters")
+    master
+  }
+
+  /**
+   * Submits an application with default jar setting. Use java property "gearpump.app.jar" if
+   * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will
+   * not send the jar across the wire.
+   */
+  def submit(app: Application): Int = {
+    submit(app, System.getProperty(GEARPUMP_APP_JAR))
+  }
+
+  def submit(app: Application, jar: String): Int = {
+    submit(app, jar, getExecutorNum())
+  }
+
+  def submit(app: Application, jar: String, executorNum: Int): Int = {
+    val client = getMasterClient
+    val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
+    val submissionConfig = getSubmissionConfig(config)
+      .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum))
+    val appDescription =
+      AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
+    val appJar = Option(jar).map(loadFile)
+    client.submitApplication(appDescription, appJar)
+  }
+
+  private def getExecutorNum(): Int = {
+    Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1)
+  }
+
+  private def getSubmissionConfig(config: Config): Config = {
+    ClusterConfig.filterOutDefaultConfig(config)
+  }
+
+  def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = {
+    import scala.concurrent.ExecutionContext.Implicits.global
+    val result = Await.result(
+      ActorUtil.askAppMaster[ReplayApplicationResult](master,
+        appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf)
+    result
+  }
+
+  def askAppMaster[T](appId: Int, msg: Any): Future[T] = {
+    import scala.concurrent.ExecutionContext.Implicits.global
+    ActorUtil.askAppMaster[T](master, appId, msg)
+  }
+
+  def listApps: AppMastersData = {
+    val client = getMasterClient
+    client.listApplications
+  }
+
+  def shutdown(appId: Int): Unit = {
+    val client = getMasterClient
+    client.shutdownApplication(appId)
+  }
+
+  def resolveAppID(appId: Int): ActorRef = {
+    val client = getMasterClient
+    client.resolveAppId(appId)
+  }
+
+  def close(): Unit = {
+    if (shouldCleanupSystem) {
+      LOG.info(s"Shutting down system ${system.name}")
+      system.terminate()
+    }
+  }
+
+  private def loadFile(jarPath: String): AppJar = {
+    val jarFile = new java.io.File(jarPath)
+    val path = jarStoreService.copyFromLocal(jarFile)
+    AppJar(jarFile.getName, path)
+  }
+
+  private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = {
+    val fullName = if (namePrefix != null && namePrefix != "") {
+      namePrefix + "_" + appName
+    } else {
+      appName
+    }
+    if (!Util.validApplicationName(fullName)) {
+      close()
+      val error = s"The application name $appName is not a proper name. An app name can " +
+        "be a sequence of letters, numbers or underscore character \"_\""
+      throw new Exception(error)
+    }
+    fullName
+  }
+
+  private def getMasterClient: MasterClient = {
+    val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90)
+    new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS))
+  }
+}
+
+object ClientContext {
+
+  def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null)
+
+  def apply(system: ActorSystem): ClientContext = {
+    new ClientContext(ClusterConfig.default(), system, null)
+  }
+
+  def apply(system: ActorSystem, master: ActorRef): ClientContext = {
+    new ClientContext(ClusterConfig.default(), system, master)
+  }
+
+  def apply(config: Config): ClientContext = new ClientContext(config, null, null)
+
+  def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = {
+    new ClientContext(config, system, master)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala
new file mode 100644
index 0000000..77ebedf
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.client
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.{Failure, Success}
+
+import akka.actor.ActorRef
+import akka.pattern.ask
+import akka.util.Timeout
+
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
+import org.apache.gearpump.cluster.{AppDescription, AppJar}
+
+/**
+ * Client to inter-operate with Master node.
+ *
+ * NOTE: Stateless, thread safe
+ */
+class MasterClient(master: ActorRef, timeout: Timeout) {
+  implicit val masterClientTimeout = timeout
+
+  def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = {
+    val result = Await.result(
+      (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]],
+      Duration.Inf)
+    val appId = result.appId match {
+      case Success(appId) =>
+        // scalastyle:off println
+        Console.println(s"Submit application succeed. The application id is $appId")
+        // scalastyle:on println
+        appId
+      case Failure(ex) => throw ex
+    }
+    appId
+  }
+
+  def resolveAppId(appId: Int): ActorRef = {
+    val result = Await.result(
+      (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf)
+    result.appMaster match {
+      case Success(appMaster) => appMaster
+      case Failure(ex) => throw ex
+    }
+  }
+
+  def shutdownApplication(appId: Int): Unit = {
+    val result = Await.result(
+      (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]],
+      Duration.Inf)
+    result.appId match {
+      case Success(_) =>
+      case Failure(ex) => throw ex
+    }
+  }
+
+  def listApplications: AppMastersData = {
+    val result = Await.result(
+      (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf)
+    result
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala
new file mode 100644
index 0000000..02c6f1a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import org.apache.gearpump.cluster.main.ArgumentsParser.Syntax
+
+case class CLIOption[+T](
+    description: String = "", required: Boolean = false, defaultValue: Option[T] = None)
+
+class ParseResult(optionMap: Map[String, String], remainArguments: Array[String]) {
+  def getInt(key: String): Int = optionMap.get(key).get.toInt
+
+  def getString(key: String): String = optionMap.get(key).get
+
+  def getBoolean(key: String): Boolean = optionMap.get(key).get.toBoolean
+
+  def exists(key: String): Boolean = !(optionMap.getOrElse(key, "").isEmpty)
+
+  def remainArgs: Array[String] = this.remainArguments
+}
+
+/**
+ * Parser for command line arguments
+ *
+ * Grammar: -option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2...
+ */
+trait ArgumentsParser {
+
+  val ignoreUnknownArgument = false
+
+  // scalastyle:off println
+  def help(): Unit = {
+    Console.println(s"\nHelp: $description")
+    var usage = List.empty[String]
+    options.map(kv => if (kv._2.required) {
+      usage = usage :+ s"-${kv._1} (required:${kv._2.required})${kv._2.description}"
+    } else {
+      usage = usage :+ s"-${kv._1} (required:${kv._2.required}, " +
+        s"default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}"
+    })
+    usage :+= remainArgs.map(k => s"<$k>").mkString(" ")
+    usage.foreach(Console.println(_))
+  }
+  // scalastyle:on println
+
+  def parse(args: Array[String]): ParseResult = {
+    val syntax = Syntax(options, remainArgs, ignoreUnknownArgument)
+    ArgumentsParser.parse(syntax, args)
+  }
+
+  val description: String = ""
+  val options: Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])]
+  val remainArgs: Array[String] = Array.empty[String]
+}
+
+object ArgumentsParser {
+
+  case class Syntax(
+      val options: Array[(String, CLIOption[Any])], val remainArgs: Array[String],
+      val ignoreUnknownArgument: Boolean)
+
+  def parse(syntax: Syntax, args: Array[String]): ParseResult = {
+    import syntax.{ignoreUnknownArgument, options, remainArgs}
+    var config = Map.empty[String, String]
+    var remain = Array.empty[String]
+
+    def doParse(argument: List[String]): Unit = {
+      argument match {
+        case Nil => Unit // true if everything processed successfully
+
+        case key :: value :: rest if key.startsWith("-") && !value.startsWith("-") =>
+          val fixedKey = key.substring(1)
+          if (!options.map(_._1).contains(fixedKey)) {
+            if (!ignoreUnknownArgument) {
+              throw new Exception(s"found unknown option $fixedKey")
+            } else {
+              remain ++= Array(key, value)
+            }
+          } else {
+            config += fixedKey -> value
+          }
+          doParse(rest)
+
+        case key :: rest if key.startsWith("-") =>
+          val fixedKey = key.substring(1)
+          if (!options.map(_._1).contains(fixedKey)) {
+            throw new Exception(s"found unknown option $fixedKey")
+          } else {
+            config += fixedKey -> "true"
+          }
+          doParse(rest)
+
+        case value :: rest =>
+          // scalastyle:off println
+          Console.err.println(s"Warning: get unknown argument $value, maybe it is a main class")
+          // scalastyle:on println
+          remain ++= value :: rest
+          doParse(Nil)
+      }
+    }
+    doParse(args.toList)
+
+    options.foreach { pair =>
+      val (key, option) = pair
+      if (!config.contains(key) && !option.required) {
+        config += key -> option.defaultValue.getOrElse("").toString
+      }
+    }
+
+    options.foreach { pair =>
+      val (key, value) = pair
+      if (config.get(key).isEmpty) {
+        throw new Exception(s"Missing option ${key}...")
+      }
+    }
+
+    if (remain.length < remainArgs.length) {
+      throw new Exception(s"Missing arguments ...")
+    }
+
+    new ParseResult(config, remain)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
new file mode 100644
index 0000000..9305d5c
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.util.concurrent.{TimeUnit, TimeoutException}
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
+
+import akka.actor.{Actor, ActorRef, Props, _}
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor}
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
+import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo}
+import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
+import org.apache.gearpump.cluster.{AppDescription, AppJar, _}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.ActorSystemBooter._
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util}
+
+/**
+ *
+ * AppMasterLauncher is a child Actor of AppManager, it is responsible
+ * to launch the AppMaster on the cluster.
+ */
+class AppMasterLauncher(
+    appId: Int, executorId: Int, app: AppDescription,
+    jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef])
+  extends Actor {
+  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+
+  val scheduler = context.system.scheduler
+  val systemConfig = context.system.settings.config
+  val TIMEOUT = Duration(15, TimeUnit.SECONDS)
+
+  val appMasterAkkaConfig: Config = app.clusterConfig
+
+  LOG.info(s"Ask Master resource to start AppMaster $appId...")
+  master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))
+
+  def receive: Receive = waitForResourceAllocation
+
+  def waitForResourceAllocation: Receive = {
+    case ResourceAllocated(allocations) =>
+
+      val ResourceAllocation(resource, worker, workerId) = allocations(0)
+      LOG.info(s"Resource allocated for appMaster $appId on worker ${workerId}(${worker.path})")
+
+      val submissionTime = System.currentTimeMillis()
+
+      val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, username,
+        submissionTime, config = appMasterAkkaConfig)
+      val workerInfo = WorkerInfo(workerId, worker)
+      val appMasterContext =
+        AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo)
+      LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} for app $appId")
+      val name = ActorUtil.actorNameForExecutor(appId, executorId)
+      val selfPath = ActorUtil.getFullPath(context.system, self.path)
+
+      val jvmSetting =
+        Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater
+      val executorJVM = ExecutorJVMConfig(jvmSetting.classPath, jvmSetting.vmargs,
+        classOf[ActorSystemBooter].getName, Array(name, selfPath), jar,
+        username, appMasterAkkaConfig)
+
+      worker ! LaunchExecutor(appId, executorId, resource, executorJVM)
+      context.become(waitForActorSystemToStart(worker, appMasterContext, app.userConfig, resource))
+  }
+
+  def waitForActorSystemToStart(
+      worker: ActorRef, appContext: AppMasterContext, user: UserConfig, resource: Resource)
+    : Receive = {
+    case ExecutorLaunchRejected(reason, ex) =>
+      LOG.error(s"Executor Launch failed reason: $reason", ex)
+      LOG.info(s"reallocate resource $resource to start appmaster")
+      master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))
+      context.become(waitForResourceAllocation)
+    case RegisterActorSystem(systemPath) =>
+      LOG.info(s"Received RegisterActorSystem $systemPath for AppMaster")
+      sender ! ActorSystemRegistered(worker)
+
+      val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS)
+        .asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath)
+      sender ! CreateActor(
+        AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId")
+
+      import context.dispatcher
+      val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self,
+        CreateActorFailed(app.appMaster, new TimeoutException))
+      context.become(waitForAppMasterToStart(worker, appMasterTimeout))
+  }
+
+  def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable): Receive = {
+    case ActorCreated(appMaster, _) =>
+      cancel.cancel()
+      sender ! BindLifeCycle(appMaster)
+      LOG.info(s"AppMaster is created, mission complete...")
+      replyToClient(SubmitApplicationResult(Success(appId)))
+      context.stop(self)
+    case CreateActorFailed(name, reason) =>
+      cancel.cancel()
+      worker ! ShutdownExecutor(appId, executorId, reason.getMessage)
+      replyToClient(SubmitApplicationResult(Failure(reason)))
+      context.stop(self)
+  }
+
+  def replyToClient(result: SubmitApplicationResult): Unit = {
+    if (client.isDefined) {
+      client.get.tell(result, master)
+    }
+  }
+}
+
+object AppMasterLauncher extends AppMasterLauncherFactory {
+  def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+      username: String, master: ActorRef, client: Option[ActorRef]): Props = {
+    Props(new AppMasterLauncher(appId, executorId, app, jar, username, master, client))
+  }
+}
+
+trait AppMasterLauncherFactory {
+  def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+      username: String, master: ActorRef, client: Option[ActorRef]): Props
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala
new file mode 100644
index 0000000..e2ee00e
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor._
+import org.slf4j.Logger
+
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.{ActorUtil, LogUtil}
+
+/**
+ * This works with Master HA. When there are multiple Master nodes,
+ * This will find a active one.
+ */
+class MasterProxy(masters: Iterable[ActorPath], timeout: FiniteDuration)
+  extends Actor with Stash {
+  import org.apache.gearpump.cluster.master.MasterProxy._
+
+  val LOG: Logger = LogUtil.getLogger(getClass, name = self.path.name)
+
+  val contacts = masters.map { url =>
+    LOG.info(s"Contacts point URL: $url")
+    context.actorSelection(url)
+  }
+
+  var watchers: List[ActorRef] = List.empty[ActorRef]
+
+  import context.dispatcher
+
+  def findMaster(): Cancellable = {
+    repeatActionUtil(timeout) {
+      contacts foreach { contact =>
+        LOG.info(s"sending identity to $contact")
+        contact ! Identify(None)
+      }
+    }
+  }
+
+  context.become(establishing(findMaster()))
+
+  LOG.info("Master Proxy is started...")
+
+  override def postStop(): Unit = {
+    watchers.foreach(_ ! MasterStopped)
+    super.postStop()
+  }
+
+  override def receive: Receive = {
+    case _ =>
+  }
+
+  def establishing(findMaster: Cancellable): Actor.Receive = {
+    case ActorIdentity(_, Some(receptionist)) =>
+      context watch receptionist
+      LOG.info("Connected to [{}]", receptionist.path)
+      context.watch(receptionist)
+
+      watchers.foreach(_ ! MasterRestarted)
+      unstashAll()
+      findMaster.cancel()
+      context.become(active(receptionist) orElse messageHandler(receptionist))
+    case ActorIdentity(_, None) => // ok, use another instead
+    case msg =>
+      LOG.info(s"Stashing ${msg.getClass.getSimpleName}")
+      stash()
+  }
+
+  def active(receptionist: ActorRef): Actor.Receive = {
+    case Terminated(receptionist) =>
+      LOG.info("Lost contact with [{}], restablishing connection", receptionist)
+      context.become(establishing(findMaster))
+    case _: ActorIdentity => // ok, from previous establish, already handled
+    case WatchMaster(watcher) =>
+      watchers = watchers :+ watcher
+  }
+
+  def messageHandler(master: ActorRef): Receive = {
+    case msg =>
+      LOG.debug(s"Get msg ${msg.getClass.getSimpleName}, forwarding to ${master.path}")
+      master forward msg
+  }
+
+  def scheduler: Scheduler = context.system.scheduler
+  import scala.concurrent.duration._
+  private def repeatActionUtil(timeout: FiniteDuration)(action: => Unit): Cancellable = {
+    val send = scheduler.schedule(0.seconds, 2.seconds)(action)
+    val suicide = scheduler.scheduleOnce(timeout) {
+      send.cancel()
+      self ! PoisonPill
+    }
+
+    new Cancellable {
+      def cancel(): Boolean = {
+        val result1 = send.cancel()
+        val result2 = suicide.cancel()
+        result1 && result2
+      }
+
+      def isCancelled: Boolean = {
+        send.isCancelled && suicide.isCancelled
+      }
+    }
+  }
+}
+
+object MasterProxy {
+  case object MasterRestarted
+  case object MasterStopped
+  case class WatchMaster(watcher: ActorRef)
+
+  import scala.concurrent.duration._
+  def props(masters: Iterable[HostPort], duration: FiniteDuration = 30.seconds): Props = {
+    val contacts = masters.map(ActorUtil.getMasterActorPath(_))
+    Props(new MasterProxy(contacts, duration))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala
new file mode 100644
index 0000000..d182b22
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+
+/** Master status. Synced means all masters are live and synced. */
+object MasterStatus {
+  type Type = String
+  val Synced = "synced"
+  val UnSynced = "unsynced"
+}
+
+case class MasterNode(host: String, port: Int) {
+  def toTuple: (String, Int) = {
+    (host, port)
+  }
+}
+
+/**
+ * Master information returned for REST API call
+ */
+case class MasterSummary(
+    leader: MasterNode,
+    cluster: List[MasterNode],
+    aliveFor: Long,
+    logFile: String,
+    jarStore: String,
+    masterStatus: MasterStatus.Type,
+    homeDirectory: String,
+    activities: List[MasterActivity],
+    jvmName: String,
+    historyMetricsConfig: HistoryMetricsConfig = null)
+
+case class MasterActivity(time: Long, event: String)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala
new file mode 100644
index 0000000..c21d396
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.ActorRef
+import org.apache.gearpump.cluster.worker.WorkerId
+
+case class Resource(slots: Int) {
+
+  // scalastyle:off spaces.after.plus
+  def +(other: Resource): Resource = Resource(slots + other.slots)
+  // scalastyle:on spaces.after.plus
+
+  def -(other: Resource): Resource = Resource(slots - other.slots)
+
+  def >(other: Resource): Boolean = slots > other.slots
+
+  def >=(other: Resource): Boolean = !(this < other)
+
+  def <(other: Resource): Boolean = slots < other.slots
+
+  def <=(other: Resource): Boolean = !(this > other)
+
+  def isEmpty: Boolean = {
+    slots == 0
+  }
+}
+
+/**
+ * Each streaming job can have a priority, the job with higher priority
+ * will get scheduled resource earlier than those with lower priority.
+ */
+object Priority extends Enumeration {
+  type Priority = Value
+  val LOW, NORMAL, HIGH = Value
+}
+
+/**
+ * Relaxation.ONEWORKER means only resource (slot) from that worker will be accepted by
+ * the requestor application job.
+ */
+object Relaxation extends Enumeration {
+  type Relaxation = Value
+
+  // Option ONEWORKER allow user to schedule a task on specific worker.
+  val ANY, ONEWORKER, SPECIFICWORKER = Value
+}
+
+import org.apache.gearpump.cluster.scheduler.Priority._
+import org.apache.gearpump.cluster.scheduler.Relaxation._
+
+case class ResourceRequest(
+    resource: Resource, workerId: WorkerId, priority: Priority = NORMAL,
+    relaxation: Relaxation = ANY, executorNum: Int = 1)
+
+case class ResourceAllocation(resource: Resource, worker: ActorRef, workerId: WorkerId)
+
+object Resource {
+  def empty: Resource = new Resource(0)
+
+  def min(res1: Resource, res2: Resource): Resource = if (res1.slots < res2.slots) res1 else res2
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala
new file mode 100644
index 0000000..58f7dd8
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.RichProcess
+
+/**
+ * ExecutorProcessLauncher is used to launch a process for Executor using given parameters.
+ *
+ * User can implement this interface to decide the behavior of launching a process.
+ * Set "gearpump.worker.executor-process-launcher" to your implemented class name.
+ */
+trait ExecutorProcessLauncher {
+  val config: Config
+
+  /**
+   * This function launches a process for Executor using given parameters.
+   *
+   * @param appId The appId of the executor to be launched
+   * @param executorId The executorId of the executor to be launched
+   * @param resource The resource allocated for that executor
+   * @param options The command options
+   * @param classPath The classpath of the process
+   * @param mainClass The main class of the process
+   * @param arguments The rest arguments
+   */
+  def createProcess(
+      appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
+      classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess
+
+  /**
+   * This function will clean resources for a launched process.
+   * @param appId The appId of the launched executor
+   * @param executorId The executorId of launched executor
+   */
+  def cleanProcess(appId: Int, executorId: Int): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala
new file mode 100644
index 0000000..93213e4
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+
+/**
+ * WorkerId is used to uniquely track a worker machine.
+ *
+ * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that
+ *                  sessionId is **NOT** unique, so always use WorkerId for comparison.
+ * @param registerTime the timestamp when a worker node register itself to master node
+ */
+case class WorkerId(sessionId: Int, registerTime: Long)
+
+object WorkerId {
+  val unspecified: WorkerId = new WorkerId(-1, 0L)
+
+  def render(workerId: WorkerId): String = {
+    workerId.registerTime + "_" + workerId.sessionId
+  }
+
+  def parse(str: String): WorkerId = {
+    val pair = str.split("_")
+    new WorkerId(pair(1).toInt, pair(0).toLong)
+  }
+
+  implicit val workerIdOrdering: Ordering[WorkerId] = {
+    new Ordering[WorkerId] {
+
+      /** Compare timestamp first, then id */
+      override def compare(x: WorkerId, y: WorkerId): Int = {
+        if (x.registerTime < y.registerTime) {
+          -1
+        } else if (x.registerTime == y.registerTime) {
+          if (x.sessionId < y.sessionId) {
+            -1
+          } else if (x.sessionId == y.sessionId) {
+            0
+          } else {
+            1
+          }
+        } else {
+          1
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala
new file mode 100644
index 0000000..f4c0587
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+
+/**
+ * Worker summary information for REST API.
+ */
+case class WorkerSummary(
+    workerId: WorkerId,
+    state: String,
+    actorPath: String,
+    aliveFor: Long,
+    logFile: String,
+    executors: Array[ExecutorSlots],
+    totalSlots: Int,
+    availableSlots: Int,
+    homeDirectory: String,
+    jvmName: String,
+    // Id used to uniquely identity this worker process in low level resource manager like YARN.
+    resourceManagerContainerId: String,
+    historyMetricsConfig: HistoryMetricsConfig = null)
+
+object WorkerSummary {
+  def empty: WorkerSummary = {
+    WorkerSummary(WorkerId.unspecified, "", "", 0L, "",
+      Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "")
+  }
+}
+
+case class ExecutorSlots(appId: Int, executorId: Int, slots: Int)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
new file mode 100644
index 0000000..0ba9558
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.jarstore
+
+import java.io.File
+import java.net.URI
+import java.util.ServiceLoader
+import scala.collection.JavaConverters._
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+
+import org.apache.gearpump.util.{Constants, Util}
+
+case class FilePath(path: String)
+
+/**
+ * JarStoreService is used to manage the upload/download of binary files,
+ * like user submitted application jar.
+ */
+trait JarStoreService {
+  /**
+   * The scheme of the JarStoreService.
+   * Like "hdfs" for HDFS file system, and "file" for a local
+   * file system.
+   */
+  val scheme: String
+
+  /**
+   * Init the Jar Store.
+   */
+  def init(config: Config, system: ActorSystem)
+
+  /**
+   * This function will copy the local file to the remote JarStore, called from client side.
+   * @param localFile The local file
+   */
+  def copyFromLocal(localFile: File): FilePath
+
+  /**
+   * This function will copy the remote file to local file system, called from client side.
+   *
+   * @param localFile The destination of file path
+   * @param remotePath The remote file path from JarStore
+   */
+  def copyToLocalFile(localFile: File, remotePath: FilePath)
+}
+
+object JarStoreService {
+
+  /**
+   * Get a active JarStoreService by specifying a scheme.
+   *
+   * Please see config [[org.apache.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for
+   * more information.
+   */
+  def get(config: Config): JarStoreService = {
+    val jarStoreRootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
+    get(jarStoreRootPath)
+  }
+
+  private lazy val jarstoreServices: List[JarStoreService] = {
+    ServiceLoader.load(classOf[JarStoreService]).asScala.toList
+  }
+
+  private def get(rootPath: String): JarStoreService = {
+    val scheme = new URI(Util.resolvePath(rootPath)).getScheme
+    jarstoreServices.find(_.scheme == scheme).get
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala b/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala
new file mode 100644
index 0000000..7010048
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import scala.collection.JavaConverters._
+
+import akka.actor.{ActorRef, ActorSystem}
+
+import io.gearpump.codahale.metrics.{Gauge => CodaGauge, MetricRegistry}
+import org.apache.gearpump.metrics.Metrics.{Counter => CounterData, Gauge => GaugeData, Histogram => HistogramData, Meter => MeterData}
+import org.apache.gearpump.metrics.MetricsReporterService.ReportTo
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * A reporter class for logging metrics values to a remote actor periodically
+ */
+class AkkaReporter(
+    system: ActorSystem,
+    registry: MetricRegistry)
+  extends ReportTo {
+  private val LOG = LogUtil.getLogger(getClass)
+  LOG.info("Start Metrics AkkaReporter")
+
+  override def report(to: ActorRef): Unit = {
+    val counters = registry.getCounters()
+    val histograms = registry.getHistograms()
+    val meters = registry.getMeters()
+    val gauges = registry.getGauges()
+
+    counters.entrySet().asScala.foreach { pair =>
+      to ! CounterData(pair.getKey, pair.getValue.getCount)
+    }
+
+    histograms.entrySet().asScala.foreach { pair =>
+      val key = pair.getKey
+      val value = pair.getValue
+      val s = value.getSnapshot
+      to ! HistogramData(
+        key, s.getMean, s.getStdDev, s.getMedian,
+        s.get95thPercentile, s.get99thPercentile, s.get999thPercentile)
+    }
+
+    meters.entrySet().asScala.foreach { pair =>
+      val key = pair.getKey
+      val value = pair.getValue
+      to ! MeterData(key,
+        value.getCount,
+        value.getMeanRate,
+        value.getOneMinuteRate,
+        getRateUnit)
+    }
+
+    gauges.entrySet().asScala.foreach { kv =>
+      val value = kv.getValue.asInstanceOf[CodaGauge[Number]].getValue.longValue()
+      to ! GaugeData(kv.getKey, value)
+    }
+  }
+
+  private def getRateUnit: String = {
+    "events/s"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala b/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala
new file mode 100644
index 0000000..a6ce799
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter}
+
+/**
+ * @see io.gearpump.codahale.metrics.Counter
+ */
+class Counter(val name: String, counter: CodaHaleCounter, sampleRate: Int = 1) {
+  private var sampleCount = 0L
+  private var toBeIncremented = 0L
+
+  def inc() {
+    inc(1)
+  }
+
+  def inc(n: Long) {
+    toBeIncremented += n
+    sampleCount += 1
+    if (null != counter && sampleCount % sampleRate == 0) {
+      counter.inc(toBeIncremented)
+      toBeIncremented = 0
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala b/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala
new file mode 100644
index 0000000..7d7e19b
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.metrics
+
+import io.gearpump.codahale.metrics.{Histogram => CodaHaleHistogram}
+
+/**
+ * @see io.gearpump.codahale.metrics.Histogram
+ */
+class Histogram(val name: String, histogram: CodaHaleHistogram, sampleRate: Int = 1) {
+  private var sampleCount = 0L
+
+  def update(value: Long) {
+    sampleCount += 1
+    if (null != histogram && sampleCount % sampleRate == 0) {
+      histogram.update(value)
+    }
+  }
+
+  def getMean(): Double = {
+    histogram.getSnapshot.getMean
+  }
+
+  def getStdDev(): Double = {
+    histogram.getSnapshot.getStdDev
+  }
+}
\ No newline at end of file