You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:47 UTC

[37/49] incubator-gearpump git commit: fix GEARPUMP-118 change package name to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
new file mode 100644
index 0000000..435b83e
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+
+import akka.actor._
+import akka.pattern.ask
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _}
+import org.apache.gearpump.cluster.master.Master._
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
+
+/**
+ * AppManager is dedicated child of Master to manager all applications.
+ */
+private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
+  extends Actor with Stash with TimeOutScheduler {
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  private val executorId: Int = APPMASTER_DEFAULT_EXECUTOR_ID
+  private val appMasterMaxRetries: Int = 5
+  private val appMasterRetryTimeRange: Duration = 20.seconds
+
+  implicit val timeout = FUTURE_TIMEOUT
+  implicit val executionContext = context.dispatcher
+
+  // Next available appId
+  private var appId: Int = 1
+
+  // From appid to appMaster data
+  private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
+
+  // Dead appmaster list
+  private var deadAppMasters = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
+
+  private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
+
+  def receive: Receive = null
+
+  kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
+  context.become(waitForMasterState)
+
+  def waitForMasterState: Receive = {
+    case GetKVSuccess(_, result) =>
+      val masterState = result.asInstanceOf[MasterState]
+      if (masterState != null) {
+        this.appId = masterState.maxId + 1
+        this.deadAppMasters = masterState.deadAppMasters
+        this.appMasterRegistry = masterState.appMasterRegistry
+      }
+      context.become(receiveHandler)
+      unstashAll()
+    case GetKVFailed(ex) =>
+      LOG.error("Failed to get master state, shutting down master to avoid data corruption...")
+      context.parent ! PoisonPill
+    case msg =>
+      LOG.info(s"Get message ${msg.getClass.getSimpleName}")
+      stash()
+  }
+
+  def receiveHandler: Receive = {
+    val msg = "Application Manager started. Ready for application submission..."
+    LOG.info(msg)
+    clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
+      appDataStoreService orElse terminationWatch
+  }
+
+  def clientMsgHandler: Receive = {
+    case SubmitApplication(app, jar, username) =>
+      LOG.info(s"Submit Application ${app.name}($appId) by $username...")
+      val client = sender
+      if (applicationNameExist(app.name)) {
+        client ! SubmitApplicationResult(Failure(
+          new Exception(s"Application name ${app.name} already existed")))
+      } else {
+        context.actorOf(launcher.props(appId, executorId, app, jar, username, context.parent,
+          Some(client)), s"launcher${appId}_${Util.randInt()}")
+
+        val appState = new ApplicationState(appId, app.name, 0, app, jar, username, null)
+        appMasterRestartPolicies += appId ->
+          new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
+        kvService ! PutKV(appId.toString, APP_STATE, appState)
+        appId += 1
+      }
+
+    case RestartApplication(appId) =>
+      val client = sender()
+      (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+        case GetKVSuccess(_, result) =>
+          val appState = result.asInstanceOf[ApplicationState]
+          if (appState != null) {
+            LOG.info(s"Shutting down the application (restart), $appId")
+            self ! ShutdownApplication(appId)
+            self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client)
+          } else {
+            client ! SubmitApplicationResult(Failure(
+              new Exception(s"Failed to restart, because the application $appId does not exist.")
+            ))
+          }
+        case GetKVFailed(ex) =>
+          client ! SubmitApplicationResult(Failure(
+            new Exception(s"Unable to obtain the Master State. " +
+              s"Application $appId will not be restarted.")
+          ))
+      }
+
+    case ShutdownApplication(appId) =>
+      LOG.info(s"App Manager Shutting down application $appId")
+      val (_, info) = appMasterRegistry.getOrElse(appId, (null, null))
+      Option(info) match {
+        case Some(info) =>
+          val worker = info.worker
+          val workerPath = Option(worker).map(_.path).orNull
+          LOG.info(s"Shutdown AppMaster at ${workerPath}, appId: $appId, executorId: $executorId")
+          cleanApplicationData(appId)
+          val shutdown = ShutdownExecutor(appId, executorId,
+            s"AppMaster $appId shutdown requested by master...")
+          sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
+          sender ! ShutdownApplicationResult(Success(appId))
+        case None =>
+          val errorMsg = s"Failed to find regisration information for appId: $appId"
+          LOG.error(errorMsg)
+          sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
+      }
+    case ResolveAppId(appId) =>
+      val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
+      if (null != appMaster) {
+        sender ! ResolveAppIdResult(Success(appMaster))
+      } else {
+        sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId")))
+      }
+    case AppMastersDataRequest =>
+      var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
+      appMasterRegistry.foreach(pair => {
+        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
+        val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
+        val workerPath = Option(info.worker).map(worker =>
+          ActorUtil.getFullPath(context.system, worker.path))
+        appMastersData += AppMasterData(
+          AppMasterActive, id, info.appName, appMasterPath, workerPath.orNull,
+          info.submissionTime, info.startTime, info.finishTime, info.user)
+      })
+
+      deadAppMasters.foreach(pair => {
+        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
+        val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
+        val workerPath = Option(info.worker).map(worker =>
+          ActorUtil.getFullPath(context.system, worker.path))
+
+        appMastersData += AppMasterData(
+          AppMasterInActive, id, info.appName, appMasterPath, workerPath.orNull,
+          info.submissionTime, info.startTime, info.finishTime, info.user)
+      })
+
+      sender ! AppMastersData(appMastersData.toList)
+    case QueryAppMasterConfig(appId) =>
+      val config =
+        if (appMasterRegistry.contains(appId)) {
+          val (_, info) = appMasterRegistry(appId)
+          info.config
+        } else if (deadAppMasters.contains(appId)) {
+          val (_, info) = deadAppMasters(appId)
+          info.config
+        } else {
+          null
+        }
+      sender ! AppMasterConfig(config)
+
+    case appMasterDataRequest: AppMasterDataRequest =>
+      val appId = appMasterDataRequest.appId
+      val (appStatus, appMaster, info) =
+        if (appMasterRegistry.contains(appId)) {
+          val (appMaster, info) = appMasterRegistry(appId)
+          (AppMasterActive, appMaster, info)
+        } else if (deadAppMasters.contains(appId)) {
+          val (appMaster, info) = deadAppMasters(appId)
+          (AppMasterInActive, appMaster, info)
+        } else {
+          (AppMasterNonExist, null, null)
+        }
+
+      appStatus match {
+        case AppMasterActive | AppMasterInActive =>
+          val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
+          val workerPath = Option(info.worker).map(
+            worker => ActorUtil.getFullPath(context.system, worker.path)).orNull
+          sender ! AppMasterData(
+            appStatus, appId, info.appName, appMasterPath, workerPath,
+            info.submissionTime, info.startTime, info.finishTime, info.user)
+
+        case AppMasterNonExist =>
+          sender ! AppMasterData(AppMasterNonExist)
+      }
+  }
+
+  def workerMessage: Receive = {
+    case ShutdownExecutorSucceed(appId, executorId) =>
+      LOG.info(s"Shut down executor $executorId for application $appId successfully")
+    case failed: ShutdownExecutorFailed =>
+      LOG.error(failed.reason)
+  }
+
+  private def shutDownExecutorTimeOut(): Unit = {
+    LOG.error(s"Shut down executor time out")
+  }
+
+  def appMasterMessage: Receive = {
+    case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
+      val startTime = System.currentTimeMillis()
+      val register = registerBack.copy(startTime = startTime)
+
+      LOG.info(s"Register AppMaster for app: ${register.appId} $register")
+      context.watch(appMaster)
+      appMasterRegistry += register.appId -> (appMaster, register)
+      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+        MasterState(appId, appMasterRegistry, deadAppMasters))
+      sender ! AppMasterRegistered(register.appId)
+  }
+
+  def appDataStoreService: Receive = {
+    case SaveAppData(appId, key, value) =>
+      val client = sender
+      (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
+        case PutKVSuccess =>
+          client ! AppDataSaved
+        case PutKVFailed(k, ex) =>
+          client ! SaveAppDataFailed
+      }
+    case GetAppData(appId, key) =>
+      val client = sender
+      (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map {
+        case GetKVSuccess(privateKey, value) =>
+          client ! GetAppDataResult(key, value)
+        case GetKVFailed(ex) =>
+          client ! GetAppDataResult(key, null)
+      }
+  }
+
+  def terminationWatch: Receive = {
+    case terminate: Terminated =>
+      terminate.getAddressTerminated()
+      LOG.info(s"AppMaster(${terminate.actor.path}) is terminiated, " +
+        s"network down: ${terminate.getAddressTerminated()}")
+
+      // Now we assume that the only normal way to stop the application is submitting a
+      // ShutdownApplication request
+      val application = appMasterRegistry.find { appInfo =>
+        val (_, (actorRef, _)) = appInfo
+        actorRef.compareTo(terminate.actor) == 0
+      }
+      if (application.nonEmpty) {
+        val appId = application.get._1
+        (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+          case GetKVSuccess(_, result) =>
+            val appState = result.asInstanceOf[ApplicationState]
+            if (appState != null) {
+              LOG.info(s"Recovering application, $appId")
+              self ! RecoverApplication(appState)
+            } else {
+              LOG.error(s"Cannot find application state for $appId")
+            }
+          case GetKVFailed(ex) =>
+            LOG.error(s"Cannot find master state to recover")
+        }
+      }
+  }
+
+  def selfMsgHandler: Receive = {
+    case RecoverApplication(state) =>
+      val appId = state.appId
+      if (appMasterRestartPolicies.get(appId).get.allowRestart) {
+        LOG.info(s"AppManager Recovering Application $appId...")
+        context.actorOf(launcher.props(appId, executorId, state.app, state.jar, state.username,
+          context.parent, None), s"launcher${appId}_${Util.randInt()}")
+      } else {
+        LOG.error(s"Application $appId failed too many times")
+      }
+  }
+
+  case class RecoverApplication(applicationStatus: ApplicationState)
+
+  private def cleanApplicationData(appId: Int): Unit = {
+    // Add the dead app to dead appMaster
+    appMasterRegistry.get(appId).foreach { pair =>
+      val (appMasterActor, info) = pair
+      deadAppMasters += appId -> (appMasterActor, info.copy(
+        finishTime = System.currentTimeMillis()))
+    }
+
+    appMasterRegistry -= appId
+
+    kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+      MasterState(this.appId, appMasterRegistry, deadAppMasters))
+    kvService ! DeleteKVGroup(appId.toString)
+  }
+
+  private def applicationNameExist(appName: String): Boolean = {
+    appMasterRegistry.values.exists(_._2.appName == appName)
+  }
+}
+
+object AppManager {
+  final val APP_STATE = "app_state"
+  // The id is used in KVStore
+  final val MASTER_STATE = "master_state"
+
+  case class MasterState(
+      maxId: Int,
+      appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
+      deadAppMasters: Map[Int, (ActorRef, AppMasterRuntimeInfo)])
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
new file mode 100644
index 0000000..3e54214
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.Duration
+
+import akka.actor._
+import akka.cluster.Cluster
+import akka.cluster.ddata.Replicator._
+import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey}
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * A replicated simple in-memory KV service. The replications are stored on all masters.
+ */
+class InMemoryKVService extends Actor with Stash {
+  import org.apache.gearpump.cluster.master.InMemoryKVService._
+
+  private val KV_SERVICE = "gearpump_kvservice"
+
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private val replicator = DistributedData(context.system).replicator
+  private implicit val cluster = Cluster(context.system)
+
+  // Optimize write path, we can tolerate one master down for recovery.
+  private val timeout = Duration(15, TimeUnit.SECONDS)
+  private val readMajority = ReadMajority(timeout)
+  private val writeMajority = WriteMajority(timeout)
+
+  private def groupKey(group: String): LWWMapKey[Any] = {
+    LWWMapKey[Any](KV_SERVICE + "_" + group)
+  }
+
+  def receive: Receive = kvService
+
+  def kvService: Receive = {
+
+    case GetKV(group: String, key: String) =>
+      val request = Request(sender(), key)
+      replicator ! Get(groupKey(group), readMajority, Some(request))
+    case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      val appData = success.get(group)
+      LOG.info(s"Successfully retrived group: ${group.id}")
+      request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
+    case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      LOG.info(s"We cannot find group $group")
+      request.client ! GetKVSuccess(request.key, null)
+    case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      val error = s"Failed to get application data, the request key is ${request.key}"
+      LOG.error(error)
+      request.client ! GetKVFailed(new Exception(error))
+
+    case PutKV(group: String, key: String, value: Any) =>
+      val request = Request(sender(), key)
+      val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map =>
+        map + (key -> value)
+      }
+      replicator ! update
+    case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      request.client ! PutKVSuccess
+    case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) =>
+      request.client ! PutKVFailed(request.key, new Exception(error, cause))
+    case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+      request.client ! PutKVFailed(request.key, new TimeoutException())
+
+    case delete@DeleteKVGroup(group: String) =>
+      replicator ! Delete(groupKey(group), writeMajority)
+    case DeleteSuccess(group) =>
+      LOG.info(s"KV Group ${group.id} is deleted")
+    case ReplicationDeleteFailure(group) =>
+      LOG.error(s"Failed to delete KV Group ${group.id}...")
+    case DataDeleted(group) =>
+      LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
+  }
+}
+
+object InMemoryKVService {
+  /**
+   * KV Service related
+   */
+  case class GetKV(group: String, key: String)
+
+  trait GetKVResult
+
+  case class GetKVSuccess(key: String, value: Any) extends GetKVResult
+
+  case class GetKVFailed(ex: Throwable) extends GetKVResult
+
+  case class PutKV(group: String, key: String, value: Any)
+
+  case class DeleteKVGroup(group: String)
+
+  case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
+
+  trait PutKVResult
+
+  case object PutKVSuccess extends PutKVResult
+
+  case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
+
+  case class Request(client: ActorRef, key: String)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
new file mode 100644
index 0000000..1536a23
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.lang.management.ManagementFactory
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+
+import akka.actor._
+import akka.remote.DisassociatedEvent
+import com.typesafe.config.Config
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster._
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.MasterToAppMaster._
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult}
+import org.apache.gearpump.cluster.MasterToWorker._
+import org.apache.gearpump.cluster.WorkerToMaster._
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.jarstore.local.LocalJarStore
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util._
+
+/**
+ * Master Actor who manages resources of the whole cluster.
+ * It is like the resource manager of YARN.
+ */
+private[cluster] class Master extends Actor with Stash {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private val systemConfig: Config = context.system.settings.config
+  private implicit val timeout = Constants.FUTURE_TIMEOUT
+  private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService")
+  // Resources and resourceRequests can be dynamically constructed by
+  // heartbeat of worker and appmaster when master singleton is migrated.
+  // We don't need to persist them in cluster
+  private var appManager: ActorRef = null
+
+  private var scheduler: ActorRef = null
+
+  private var workers = new immutable.HashMap[ActorRef, WorkerId]
+
+  private val birth = System.currentTimeMillis()
+
+  private var nextWorkerId = 0
+
+  def receive: Receive = null
+
+  // Register jvm metrics
+  Metrics(context.system).register(new JvmMetricsSet(s"master"))
+
+  LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...")
+
+  val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
+
+  private val jarStore = if (Util.isLocalPath(jarStoreRootPath)) {
+    Some(context.actorOf(Props(classOf[LocalJarStore], jarStoreRootPath)))
+  } else {
+    None
+  }
+
+  private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
+
+  // Maintain the list of active masters.
+  private var masters: List[MasterNode] = {
+    // Add myself into the list of initial masters.
+    List(MasterNode(hostPort.host, hostPort.port))
+  }
+
+  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+
+  val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+  val historyMetricsService = if (metricsEnabled) {
+    val historyMetricsService = {
+      context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig)))
+    }
+
+    val metricsReportService = context.actorOf(
+      Props(new MetricsReporterService(Metrics(context.system))))
+    historyMetricsService.tell(ReportMetrics, metricsReportService)
+    Some(historyMetricsService)
+  } else {
+    None
+  }
+
+  kvService ! GetKV(MASTER_GROUP, WORKER_ID)
+  context.become(waitForNextWorkerId)
+
+  def waitForNextWorkerId: Receive = {
+    case GetKVSuccess(_, result) =>
+      if (result != null) {
+        this.nextWorkerId = result.asInstanceOf[Int]
+      } else {
+        LOG.warn("Cannot find existing state in the distributed cluster...")
+      }
+      context.become(receiveHandler)
+      unstashAll()
+    case GetKVFailed(ex) =>
+      LOG.error("Failed to get worker id, shutting down master to avoid data corruption...")
+      context.parent ! PoisonPill
+    case msg =>
+      LOG.info(s"Get message ${msg.getClass.getSimpleName}")
+      stash()
+  }
+
+  def receiveHandler: Receive = workerMsgHandler orElse
+    appMasterMsgHandler orElse
+    onMasterListChange orElse
+    clientMsgHandler orElse
+    metricsService orElse
+    jarStoreService orElse
+    terminationWatch orElse
+    disassociated orElse
+    kvServiceMsgHandler orElse
+    ActorUtil.defaultMsgHandler(self)
+
+  def workerMsgHandler: Receive = {
+    case RegisterNewWorker =>
+      val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
+      nextWorkerId += 1
+      kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
+      val workerHostname = ActorUtil.getHostname(sender())
+      LOG.info(s"Register new from $workerHostname ....")
+      self forward RegisterWorker(workerId)
+
+    case RegisterWorker(id) =>
+      context.watch(sender())
+      sender ! WorkerRegistered(id, MasterInfo(self, birth))
+      scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
+      workers += (sender() -> id)
+      val workerHostname = ActorUtil.getHostname(sender())
+      LOG.info(s"Register Worker with id $id from $workerHostname ....")
+    case resourceUpdate: ResourceUpdate =>
+      scheduler forward resourceUpdate
+  }
+
+  def jarStoreService: Receive = {
+    case GetJarStoreServer =>
+      jarStore.foreach(_ forward GetJarStoreServer)
+  }
+
+  def kvServiceMsgHandler: Receive = {
+    case PutKVSuccess =>
+    // Skip
+    case PutKVFailed(key, exception) =>
+      LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
+        ExceptionUtils.getStackTrace(exception))
+  }
+
+  def metricsService: Receive = {
+    case query: QueryHistoryMetrics =>
+      if (historyMetricsService.isEmpty) {
+        // Returns empty metrics so that we don't hang the UI
+        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+      } else {
+        historyMetricsService.get forward query
+      }
+  }
+
+  def appMasterMsgHandler: Receive = {
+    case request: RequestResource =>
+      scheduler forward request
+    case registerAppMaster: RegisterAppMaster =>
+      // Forward to appManager
+      appManager forward registerAppMaster
+    case save: SaveAppData =>
+      appManager forward save
+    case get: GetAppData =>
+      appManager forward get
+    case GetAllWorkers =>
+      sender ! WorkerList(workers.values.toList)
+    case GetMasterData =>
+      val aliveFor = System.currentTimeMillis() - birth
+      val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+      val userDir = System.getProperty("user.dir")
+
+      val masterDescription =
+        MasterSummary(
+          MasterNode(hostPort.host, hostPort.port),
+          masters,
+          aliveFor,
+          logFileDir,
+          jarStoreRootPath,
+          MasterStatus.Synced,
+          userDir,
+          List.empty[MasterActivity],
+          jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+          historyMetricsConfig = getHistoryMetricsConfig
+        )
+
+      sender ! MasterData(masterDescription)
+
+    case invalidAppMaster: InvalidAppMaster =>
+      appManager forward invalidAppMaster
+  }
+
+  import scala.util.{Failure, Success}
+
+  def onMasterListChange: Receive = {
+    case MasterListUpdated(masters: List[MasterNode]) =>
+      this.masters = masters
+  }
+
+  def clientMsgHandler: Receive = {
+    case app: SubmitApplication =>
+      LOG.debug(s"Receive from client, SubmitApplication $app")
+      appManager.forward(app)
+    case app: RestartApplication =>
+      LOG.debug(s"Receive from client, RestartApplication $app")
+      appManager.forward(app)
+    case app: ShutdownApplication =>
+      LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
+      scheduler ! ApplicationFinished(app.appId)
+      appManager.forward(app)
+    case app: ResolveAppId =>
+      LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef")
+      appManager.forward(app)
+    case resolve: ResolveWorkerId =>
+      LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}")
+      val worker = workers.find(_._2 == resolve.workerId)
+      worker match {
+        case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
+        case None => sender ! ResolveWorkerIdResult(Failure(
+          new Exception(s"cannot find worker ${resolve.workerId}")))
+      }
+    case AppMastersDataRequest =>
+      LOG.debug("Master received AppMastersDataRequest")
+      appManager forward AppMastersDataRequest
+    case appMasterDataRequest: AppMasterDataRequest =>
+      LOG.debug("Master received AppMasterDataRequest")
+      appManager forward appMasterDataRequest
+    case query: QueryAppMasterConfig =>
+      LOG.debug("Master received QueryAppMasterConfig")
+      appManager forward query
+    case QueryMasterConfig =>
+      sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+  }
+
+  def disassociated: Receive = {
+    case disassociated: DisassociatedEvent =>
+      LOG.info(s" disassociated ${disassociated.remoteAddress}")
+  }
+
+  def terminationWatch: Receive = {
+    case t: Terminated =>
+      val actor = t.actor
+      LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" +
+        t.getAddressTerminated())
+
+      LOG.info("Let's filter out dead resources...")
+      // Filters out dead worker resource
+      if (workers.keySet.contains(actor)) {
+        scheduler ! WorkerTerminated(workers.get(actor).get)
+        workers -= actor
+      }
+  }
+
+  override def preStart(): Unit = {
+    val path = ActorUtil.getFullPath(context.system, self.path)
+    LOG.info(s"master path is $path")
+    val schedulerClass = Class.forName(
+      systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
+
+    appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)),
+      classOf[AppManager].getSimpleName)
+    scheduler = context.actorOf(Props(schedulerClass))
+    context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
+  }
+}
+
+object Master {
+  final val MASTER_GROUP = "master_group"
+
+  final val WORKER_ID = "next_worker_id"
+
+  case class WorkerTerminated(workerId: WorkerId)
+
+  case class MasterInfo(master: ActorRef, startTime: Long = 0L)
+
+  /** Notify the subscriber that master actor list has been updated */
+  case class MasterListUpdated(masters: List[MasterNode])
+
+  object MasterInfo {
+    def empty: MasterInfo = MasterInfo(null)
+  }
+
+  case class SlotStatus(totalSlots: Int, availableSlots: Int)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
new file mode 100644
index 0000000..1429694
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.scheduler
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.mutable
+
+import akka.actor.ActorRef
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.scheduler.Relaxation._
+import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest
+
+/** Assign resource to application based on the priority of the application */
+class PriorityScheduler extends Scheduler {
+  private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
+
+  def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] {
+    override def compare(x: PendingRequest, y: PendingRequest): Int = {
+      var res = x.request.priority.id - y.request.priority.id
+      if (res == 0) {
+        res = y.timeStamp.compareTo(x.timeStamp)
+      }
+      res
+    }
+  }
+
+  override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler
+
+  override def allocateResource(): Unit = {
+    var scheduleLater = Array.empty[PendingRequest]
+    val resourcesSnapShot = resources.clone()
+    var allocated = Resource.empty
+    val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
+
+    while (resourceRequests.nonEmpty && (allocated < totalResource)) {
+      val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue()
+      request.relaxation match {
+        case ANY =>
+          val allocations = allocateFairly(resourcesSnapShot, request)
+          val newAllocated = Resource(allocations.map(_.resource.slots).sum)
+          if (allocations.nonEmpty) {
+            appMaster ! ResourceAllocated(allocations.toArray)
+          }
+          if (newAllocated < request.resource) {
+            val remainingRequest = request.resource - newAllocated
+            val remainingExecutors = request.executorNum - allocations.length
+            val newResourceRequest = request.copy(resource = remainingRequest,
+              executorNum = remainingExecutors)
+            scheduleLater = scheduleLater :+
+              PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
+          }
+          allocated = allocated + newAllocated
+        case ONEWORKER =>
+          val availableResource = resourcesSnapShot.find { params =>
+            val (_, (_, resource)) = params
+            resource > request.resource
+          }
+          if (availableResource.nonEmpty) {
+            val (workerId, (worker, resource)) = availableResource.get
+            allocated = allocated + request.resource
+            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+              workerId)))
+            resourcesSnapShot.update(workerId, (worker, resource - request.resource))
+          } else {
+            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
+          }
+        case SPECIFICWORKER =>
+          val workerAndResource = resourcesSnapShot.get(request.workerId)
+          if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
+            val (worker, availableResource) = workerAndResource.get
+            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+              request.workerId)))
+            allocated = allocated + request.resource
+            resourcesSnapShot.update(request.workerId, (worker,
+              availableResource - request.resource))
+          } else {
+            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
+          }
+      }
+    }
+    for (request <- scheduleLater)
+      resourceRequests.enqueue(request)
+  }
+
+  def resourceRequestHandler: Receive = {
+    case RequestResource(appId, request) =>
+      LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " +
+        s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}")
+      val appMaster = sender()
+      resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
+        System.currentTimeMillis()))
+      allocateResource()
+  }
+
+  override def doneApplication(appId: Int): Unit = {
+    resourceRequests = resourceRequests.filter(_.appId != appId)
+  }
+
+  private def allocateFairly(
+      resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest)
+    : List[ResourceAllocation] = {
+    val workerNum = resources.size
+    var allocations = List.empty[ResourceAllocation]
+    var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
+    var remainingRequest = request.resource
+    var remainingExecutors = Math.min(request.executorNum, request.resource.slots)
+
+    while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
+      val exeutorNum = Math.min(workerNum, remainingExecutors)
+      val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
+
+      val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
+      val pickedResources = sortedResources.take(exeutorNum)
+
+      val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex =>
+        val ((workerId, (worker, resource)), index) = workerWithIndex
+        0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
+      }.sortBy(_._2).map(_._1)
+
+      if (flattenResource.length < toRequest.slots) {
+        // Can not safisfy the user's requirements
+        totalAvailable = Resource.empty
+      } else {
+        flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).
+          toArray.foreach { params =>
+          val ((workerId, worker), slots) = params
+          resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
+          allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
+        }
+        totalAvailable -= toRequest
+        remainingRequest -= toRequest
+        remainingExecutors -= exeutorNum
+      }
+    }
+    allocations
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
new file mode 100644
index 0000000..7187c1a
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.mutable
+
+import akka.actor.{Actor, ActorRef}
+import org.slf4j.Logger
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.WorkerTerminated
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Scheduler schedule resource for different applications.
+ */
+abstract class Scheduler extends Actor {
+  val LOG: Logger = LogUtil.getLogger(getClass)
+  protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
+
+  def handleScheduleMessage: Receive = {
+    case WorkerRegistered(id, _) =>
+      if (!resources.contains(id)) {
+        LOG.info(s"Worker $id added to the scheduler")
+        resources.put(id, (sender, Resource.empty))
+      }
+    case update@ResourceUpdate(worker, workerId, resource) =>
+      LOG.info(s"$update...")
+      if (resources.contains(workerId)) {
+        val resourceReturned = resource > resources.get(workerId).get._2
+        resources.update(workerId, (worker, resource))
+        if (resourceReturned) {
+          allocateResource()
+        }
+        sender ! UpdateResourceSucceed
+      }
+      else {
+        sender ! UpdateResourceFailed(
+          s"ResourceUpdate failed! The worker $workerId has not been registered into master")
+      }
+    case WorkerTerminated(workerId) =>
+      if (resources.contains(workerId)) {
+        resources -= workerId
+      }
+    case ApplicationFinished(appId) =>
+      doneApplication(appId)
+  }
+
+  def allocateResource(): Unit
+
+  def doneApplication(appId: Int): Unit
+}
+
+object Scheduler {
+  case class PendingRequest(
+      appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
+
+  case class ApplicationFinished(appId: Int)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
new file mode 100644
index 0000000..2f1e23d
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.{LogUtil, RichProcess, Util}
+
+/** Launcher to start an executor process */
+class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override def createProcess(
+      appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
+      classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
+
+    LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}")
+    Util.startProcess(options, classPath, mainClass, arguments)
+  }
+
+  override def cleanProcess(appId: Int, executorId: Int): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
new file mode 100644
index 0000000..b290938
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+import java.lang.management.ManagementFactory
+import java.net.URL
+import java.util.concurrent.{Executors, TimeUnit}
+import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.SupervisorStrategy.Stop
+import akka.actor._
+import com.typesafe.config.{ConfigValueFactory, Config, ConfigFactory}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
+import org.apache.gearpump.cluster.MasterToWorker._
+import org.apache.gearpump.cluster.WorkerToAppMaster._
+import org.apache.gearpump.cluster.WorkerToMaster._
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.util.ActorSystemBooter.Daemon
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util.{TimeOutScheduler, _}
+
+/**
+ * Worker is used to track the resource on single machine, it is like
+ * the node manager of YARN.
+ *
+ * @param masterProxy masterProxy is used to resolve the master
+ */
+private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
+  private val systemConfig: Config = context.system.settings.config
+
+  private val address = ActorUtil.getFullPath(context.system, self.path)
+  private var resource = Resource.empty
+  private var allocatedResources = Map[ActorRef, Resource]()
+  private var executorsInfo = Map[ActorRef, ExecutorSlots]()
+  private var id: WorkerId = WorkerId.unspecified
+  private val createdTime = System.currentTimeMillis()
+  private var masterInfo: MasterInfo = null
+  private var executorNameToActor = Map.empty[String, ActorRef]
+  private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
+  private val jarStoreService = JarStoreService.get(systemConfig)
+  jarStoreService.init(systemConfig, context.system)
+
+  private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
+  private val resourceUpdateTimeoutMs = 30000 // Milliseconds
+
+  private var totalSlots: Int = 0
+
+  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+  var historyMetricsService: Option[ActorRef] = None
+
+  override def receive: Receive = null
+  var LOG: Logger = LogUtil.getLogger(getClass)
+
+  def service: Receive =
+    appMasterMsgHandler orElse
+      clientMessageHandler orElse
+      metricsService orElse
+      terminationWatch(masterInfo.master) orElse
+      ActorUtil.defaultMsgHandler(self)
+
+  def metricsService: Receive = {
+    case query: QueryHistoryMetrics =>
+      if (historyMetricsService.isEmpty) {
+        // Returns empty metrics so that we don't hang the UI
+        sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+      } else {
+        historyMetricsService.get forward query
+      }
+  }
+
+  private var metricsInitialized = false
+
+  val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+
+  private def initializeMetrics(): Unit = {
+    // Registers jvm metrics
+    val metricsSetName = "worker" + WorkerId.render(id)
+    Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
+
+    historyMetricsService = if (metricsEnabled) {
+      val historyMetricsService = {
+        context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
+      }
+
+      val metricsReportService = context.actorOf(Props(
+        new MetricsReporterService(Metrics(context.system))))
+      historyMetricsService.tell(ReportMetrics, metricsReportService)
+      Some(historyMetricsService)
+    } else {
+      None
+    }
+  }
+
+  def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
+
+    // If master get disconnected, the WorkerRegistered may be triggered multiple times.
+    case WorkerRegistered(id, masterInfo) =>
+      this.id = id
+
+      // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
+      // itself.
+      if (!metricsInitialized) {
+        initializeMetrics()
+        metricsInitialized = true
+      }
+
+      this.masterInfo = masterInfo
+      timeoutTicker.cancel()
+      context.watch(masterInfo.master)
+      this.LOG = LogUtil.getLogger(getClass, worker = id)
+      LOG.info(s"Worker is registered. " +
+        s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
+      sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
+        resourceUpdateTimeoutMs, updateResourceTimeOut())
+      context.become(service)
+  }
+
+  private def updateResourceTimeOut(): Unit = {
+    LOG.error(s"Update worker resource time out")
+  }
+
+  def appMasterMsgHandler: Receive = {
+    case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
+      val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+      val executorToStop = executorNameToActor.get(actorName)
+      if (executorToStop.isDefined) {
+        LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
+          s"due to: $reason")
+        executorToStop.get.forward(shutdown)
+      } else {
+        LOG.error(s"Cannot find executor $actorName, ignore this message")
+        sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
+      }
+    case launch: LaunchExecutor =>
+      LOG.info(s"$launch")
+      if (resource < launch.resource) {
+        sender ! ExecutorLaunchRejected("There is no free resource on this machine")
+      } else {
+        val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
+
+        val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
+          jarStoreService, executorProcLauncher))
+        executorNameToActor += actorName -> executor
+
+        resource = resource - launch.resource
+        allocatedResources = allocatedResources + (executor -> launch.resource)
+
+        reportResourceToMaster()
+        executorsInfo += executor ->
+          ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
+        context.watch(executor)
+      }
+    case UpdateResourceFailed(reason, ex) =>
+      LOG.error(reason)
+      context.stop(self)
+    case UpdateResourceSucceed =>
+      LOG.info(s"Update resource succeed")
+    case GetWorkerData(workerId) =>
+      val aliveFor = System.currentTimeMillis() - createdTime
+      val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+      val userDir = System.getProperty("user.dir")
+      sender ! WorkerData(WorkerSummary(
+        id, "active",
+        address,
+        aliveFor,
+        logDir,
+        executorsInfo.values.toArray,
+        totalSlots,
+        resource.slots,
+        userDir,
+        jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+        resourceManagerContainerId = systemConfig.getString(
+          GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
+        historyMetricsConfig = getHistoryMetricsConfig)
+      )
+    case ChangeExecutorResource(appId, executorId, usedResource) =>
+      for (executor <- executorActorRef(appId, executorId);
+        allocatedResource <- allocatedResources.get(executor)) {
+
+        allocatedResources += executor -> usedResource
+        resource = resource + allocatedResource - usedResource
+        reportResourceToMaster()
+
+        if (usedResource == Resource(0)) {
+          allocatedResources -= executor
+          // stop executor if there is no resource binded to it.
+          LOG.info(s"Shutdown executor $executorId because the resource used is zero")
+          executor ! ShutdownExecutor(appId, executorId,
+            "Shutdown executor because the resource used is zero")
+        }
+      }
+  }
+
+  private def reportResourceToMaster(): Unit = {
+    sendMsgWithTimeOutCallBack(masterInfo.master,
+      ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
+  }
+
+  private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
+    val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+    executorNameToActor.get(actorName)
+  }
+
+  def clientMessageHandler: Receive = {
+    case QueryWorkerConfig(workerId) =>
+      if (this.id == workerId) {
+        sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+      } else {
+        sender ! WorkerConfig(ConfigFactory.empty)
+      }
+  }
+
+  private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
+    repeatActionUtil(
+      seconds = timeOutSeconds,
+      action = () => {
+        masterProxy ! RegisterWorker(workerId)
+      },
+      onTimeout = () => {
+        LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
+          s"seconds, abort and kill the worker...")
+        self ! PoisonPill
+      })
+  }
+
+  def terminationWatch(master: ActorRef): Receive = {
+    case Terminated(actor) =>
+      if (actor.compareTo(master) == 0) {
+        // Parent master is down, no point to keep worker anymore. Let's make suicide to free
+        // resources
+        LOG.info(s"Master cannot be contacted, find a new master ...")
+        context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
+      } else if (ActorUtil.isChildActorPath(self, actor)) {
+        // One executor is down,
+        LOG.info(s"Executor is down ${getExecutorName(actor)}")
+
+        val allocated = allocatedResources.get(actor)
+        if (allocated.isDefined) {
+          resource = resource + allocated.get
+          executorsInfo -= actor
+          allocatedResources = allocatedResources - actor
+          sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
+            resourceUpdateTimeoutMs, updateResourceTimeOut())
+        }
+      }
+  }
+
+  private def getExecutorName(actorRef: ActorRef): Option[String] = {
+    executorNameToActor.find(_._2 == actorRef).map(_._1)
+  }
+
+  private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
+    val launcherClazz = Class.forName(
+      systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
+    launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
+      .asInstanceOf[ExecutorProcessLauncher]
+  }
+
+  import context.dispatcher
+  override def preStart(): Unit = {
+    LOG.info(s"RegisterNewWorker")
+    totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
+    this.resource = Resource(totalSlots)
+    masterProxy ! RegisterNewWorker
+    context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
+  }
+
+  private def registerTimeoutTicker(seconds: Int): Cancellable = {
+    repeatActionUtil(seconds, () => Unit, () => {
+      LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
+        s"abort and kill the worker...")
+      self ! PoisonPill
+    })
+  }
+
+  private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
+    : Cancellable = {
+    val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
+      Duration(2, TimeUnit.SECONDS))(action())
+    val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
+    new Cancellable {
+      def cancel(): Boolean = {
+        val result1 = cancelTimeout.cancel()
+        val result2 = cancelSuicide.cancel()
+        result1 && result2
+      }
+
+      def isCancelled: Boolean = {
+        cancelTimeout.isCancelled && cancelSuicide.isCancelled
+      }
+    }
+  }
+
+  override def postStop(): Unit = {
+    LOG.info(s"Worker is going down....")
+    ioPool.shutdown()
+    context.system.terminate()
+  }
+}
+
+private[cluster] object Worker {
+
+  case class ExecutorResult(result: Try[Int])
+
+  class ExecutorWatcher(
+      launch: LaunchExecutor,
+      masterInfo: MasterInfo,
+      ioPool: ExecutionContext,
+      jarStoreService: JarStoreService,
+      procLauncher: ExecutorProcessLauncher) extends Actor {
+    import launch.{appId, executorId, resource}
+
+    private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
+
+    val executorConfig: Config = {
+      val workerConfig = context.system.settings.config
+
+      val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
+        Option(jvmConfig.executorAkkaConfig)
+      }.getOrElse(ConfigFactory.empty())
+
+      resolveExecutorConfig(workerConfig, submissionConfig)
+    }
+
+    // For some config, worker has priority, for others, user Application submission config
+    // have priorities.
+    private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
+      val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
+        .withoutPath(GEARPUMP_CLUSTER_MASTERS)
+        .withoutPath(GEARPUMP_HOME)
+        .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
+        .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
+        // Falls back to workerConfig
+        .withFallback(workerConfig)
+
+      // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
+      val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
+      val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
+        LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
+        config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
+      } else {
+        config
+      }
+
+      // Excludes reference.conf, and JVM properties..
+      ClusterConfig.filterOutDefaultConfig(updatedConf)
+    }
+
+    implicit val executorService = ioPool
+
+    private val executorHandler = {
+      val ctx = launch.executorJvmConfig
+
+      if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
+        new ExecutorHandler {
+          val exitPromise = Promise[Int]()
+          val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
+
+          override def destroy(): Unit = {
+            context.stop(app)
+          }
+          override def exitValue: Future[Int] = {
+            exitPromise.future
+          }
+        }
+      } else {
+        createProcess(ctx)
+      }
+    }
+
+    private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
+
+      val process = Future {
+        val jarPath = ctx.jar.map { appJar =>
+          val tempFile = File.createTempFile(appJar.name, ".jar")
+          jarStoreService.copyToLocalFile(tempFile, appJar.filePath)
+          val file = new URL("file:" + tempFile)
+          file.getFile
+        }
+
+        val configFile = {
+          val configFile = File.createTempFile("gearpump", ".conf")
+          ClusterConfig.saveConfig(executorConfig, configFile)
+          val file = new URL("file:" + configFile)
+          file.getFile
+        }
+
+        val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
+          ctx.classPath.map(path => expandEnviroment(path)) ++
+          jarPath.map(Array(_)).getOrElse(Array.empty[String])
+
+        val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
+        val logArgs = List(
+          s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
+          s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
+          s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
+          s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
+        val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
+
+        val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
+
+        // Remote debug executor process
+        val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
+        val remoteDebugConfig = if (remoteDebugFlag) {
+          val availablePort = Util.findFreePort().get
+          List(
+            "-Xdebug",
+            s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
+            s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
+          )
+        } else {
+          List.empty[String]
+        }
+
+        val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
+        val verboseGCConfig = if (verboseGCFlag) {
+          List(
+            s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
+            "-verbose:gc",
+            "-XX:+PrintGCDetails",
+            "-XX:+PrintGCDateStamps",
+            "-XX:+PrintTenuringDistribution",
+            "-XX:+PrintGCApplicationConcurrentTime",
+            "-XX:+PrintGCApplicationStoppedTime"
+          )
+        } else {
+          List.empty[String]
+        }
+
+        val ipv4 = List(s"-D${PREFER_IPV4}=true")
+
+        val options = ctx.jvmArguments ++ username ++
+          logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
+
+        LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}")
+        val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
+          options, classPath, ctx.mainClass, ctx.arguments)
+
+        ProcessInfo(process, jarPath, configFile)
+      }
+
+      new ExecutorHandler {
+
+        var destroyed = false
+
+        override def destroy(): Unit = {
+          LOG.info(s"Destroy executor process ${ctx.mainClass}")
+          if (!destroyed) {
+            destroyed = true
+            process.foreach { info =>
+              info.process.destroy()
+              info.jarPath.foreach(new File(_).delete())
+              new File(info.configFile).delete()
+            }
+          }
+        }
+
+        override def exitValue: Future[Int] = {
+          process.flatMap { info =>
+            val exit = info.process.exitValue()
+            if (exit == 0) {
+              Future.successful(0)
+            } else {
+              Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
+              s"error summary: ${info.process.logger.error}"))
+            }
+          }
+        }
+      }
+    }
+
+    private def expandEnviroment(path: String): String = {
+      // TODO: extend this to support more environment.
+      path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
+    }
+
+    override def preStart(): Unit = {
+      executorHandler.exitValue.onComplete { value =>
+        procLauncher.cleanProcess(appId, executorId)
+        val result = ExecutorResult(value)
+        self ! result
+      }
+    }
+
+    override def postStop(): Unit = {
+      executorHandler.destroy()
+    }
+
+    // The folders are under ${GEARPUMP_HOME}
+    val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" +
+      File.separator + "yarn")
+
+    override def receive: Receive = {
+      case ShutdownExecutor(appId, executorId, reason: String) =>
+        executorHandler.destroy()
+        sender ! ShutdownExecutorSucceed(appId, executorId)
+        context.stop(self)
+      case ExecutorResult(executorResult) =>
+        executorResult match {
+          case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
+          case Failure(e) => LOG.error("Executor exit with errors", e)
+        }
+        context.stop(self)
+    }
+
+    private def getFormatedTime(timestamp: Long): String = {
+      val datePattern = "yyyy-MM-dd-HH-mm"
+      val format = new java.text.SimpleDateFormat(datePattern)
+      format.format(timestamp)
+    }
+
+    private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
+      classPath.filterNot(matchDaemonPattern(_))
+    }
+
+    private def matchDaemonPattern(path: String): Boolean = {
+      daemonPathPattern.exists(path.contains(_))
+    }
+  }
+
+  trait ExecutorHandler {
+    def destroy(): Unit
+    def exitValue: Future[Int]
+  }
+
+  case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
+
+  /**
+   * Starts the executor in  the same JVM as worker.
+   */
+  class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
+    extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
+    private val exitCode = 0
+
+    override val supervisorStrategy =
+      OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
+        case ex: Throwable =>
+          LOG.error(s"system $name stopped ", ex)
+          exit.failure(ex)
+          Stop
+      }
+
+    override def postStop(): Unit = {
+      if (!exit.isCompleted) {
+        exit.success(exitCode)
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
new file mode 100644
index 0000000..d34f5c2
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore.dfs
+
+import java.io.File
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+import org.slf4j.Logger
+
+import org.apache.gearpump.jarstore.{FilePath, JarStoreService}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+/**
+ * DFSJarStoreService store the uploaded jar on HDFS
+ */
+class DFSJarStoreService extends JarStoreService {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  private var rootPath: Path = null
+
+  override val scheme: String = "hdfs"
+
+  override def init(config: Config, actorRefFactory: ActorSystem): Unit = {
+    rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH))
+    val fs = rootPath.getFileSystem(new Configuration())
+    if (!fs.exists(rootPath)) {
+      fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+    }
+  }
+
+  /**
+   * This function will copy the remote file to local file system, called from client side.
+   *
+   * @param localFile The destination of file path
+   * @param remotePath The remote file path from JarStore
+   */
+  override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
+    LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from ${remotePath}")
+    val filePath = new Path(rootPath, remotePath.path)
+    val fs = filePath.getFileSystem(new Configuration())
+    val target = new Path(localFile.toURI().toString)
+    fs.copyToLocalFile(filePath, target)
+  }
+
+  /**
+   * This function will copy the local file to the remote JarStore, called from client side.
+   *
+   * @param localFile The local file
+   */
+  override def copyFromLocal(localFile: File): FilePath = {
+    val remotePath = FilePath(Math.abs(new java.util.Random().nextLong()).toString)
+    LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to ${remotePath}")
+    val filePath = new Path(rootPath, remotePath.path)
+    val fs = filePath.getFileSystem(new Configuration())
+    fs.copyFromLocalFile(new Path(localFile.toURI.toString), filePath)
+    remotePath
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
new file mode 100644
index 0000000..9bd7071
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.jarstore.local
+
+import java.io.File
+
+import akka.actor.{Actor, Stash}
+import akka.pattern.pipe
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
+import org.apache.gearpump.util._
+
+/**
+ * LocalJarStore store the uploaded jar on local disk.
+ */
+class LocalJarStore(rootDirPath: String) extends Actor with Stash {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
+  val rootDirectory = new File(rootDirPath)
+
+  FileUtils.forceMkdir(rootDirectory)
+
+  val server = new FileServer(context.system, host, 0, rootDirectory)
+
+  implicit val timeout = Constants.FUTURE_TIMEOUT
+  implicit val executionContext = context.dispatcher
+
+  server.start pipeTo self
+
+  def receive: Receive = {
+    case FileServer.Port(port) =>
+      context.become(listen(port))
+      unstashAll()
+    case _ =>
+      stash()
+  }
+
+  def listen(port: Int): Receive = {
+    case GetJarStoreServer =>
+      sender ! JarStoreServerAddress(s"http://$host:$port/")
+  }
+
+  override def postStop(): Unit = {
+    server.stop
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
new file mode 100644
index 0000000..1ab103f
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore.local
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.pattern.ask
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.jarstore.{FilePath, JarStoreService}
+import org.apache.gearpump.util._
+
+/**
+ * LocalJarStoreService store the uploaded jar on local disk.
+ */
+class LocalJarStoreService extends JarStoreService {
+  private def LOG: Logger = LogUtil.getLogger(getClass)
+  private implicit val timeout = Constants.FUTURE_TIMEOUT
+  private var system: akka.actor.ActorSystem = null
+  private var master: ActorRef = null
+  private implicit def dispatcher: ExecutionContext = system.dispatcher
+
+  override val scheme: String = "file"
+
+  override def init(config: Config, system: ActorSystem): Unit = {
+    this.system = system
+    val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+      .asScala.flatMap(Util.parseHostList)
+    master = system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}")
+  }
+
+  private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]]
+    .map { address =>
+      val client = new FileServer.Client(system, address.url)
+      client
+    }
+
+  /**
+   * This function will copy the remote file to local file system, called from client side.
+   *
+   * @param localFile The destination of file path
+   * @param remotePath The remote file path from JarStore
+   */
+  override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
+    LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath")
+    val future = client.flatMap(_.download(remotePath, localFile))
+    Await.ready(future, Duration(60, TimeUnit.SECONDS))
+  }
+
+  /**
+   * This function will copy the local file to the remote JarStore, called from client side.
+   * @param localFile The local file
+   */
+  override def copyFromLocal(localFile: File): FilePath = {
+    val future = client.flatMap(_.upload(localFile))
+    Await.result(future, Duration(60, TimeUnit.SECONDS))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala b/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
new file mode 100644
index 0000000..66bb9ba
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.io.File
+import scala.concurrent.{ExecutionContext, Future}
+
+import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._
+import akka.stream.Materializer
+import akka.stream.scaladsl.FileIO
+import akka.util.ByteString
+
+/**
+ * FileDirective is a set of Akka-http directive to upload/download
+ * huge binary files to/from Akka-Http server.
+ */
+object FileDirective {
+
+  // Form field name
+  type Name = String
+
+  val CHUNK_SIZE = 262144
+
+  /**
+   * File information after a file is uploaded to server.
+   *
+   * @param originFileName original file name when user upload it in browser.
+   * @param file file name after the file is saved to server.
+   * @param length the length of the file
+   */
+  case class FileInfo(originFileName: String, file: File, length: Long)
+
+  class Form(val fields: Map[Name, FormField]) {
+    def getFile(fieldName: String): Option[FileInfo] = {
+      fields.get(fieldName).flatMap {
+        case Left(file) => Option(file)
+        case Right(_) => None
+      }
+    }
+
+    def getValue(fieldName: String): Option[String] = {
+      fields.get(fieldName).flatMap {
+        case Left(_) => None
+        case Right(value) => Option(value)
+      }
+    }
+  }
+
+  type FormField = Either[FileInfo, String]
+
+  /**
+   * directive to uploadFile, it store the uploaded files
+   * to temporary directory, and return a Map from form field name
+   * to FileInfo.
+   */
+  def uploadFile: Directive1[Form] = {
+    uploadFileTo(null)
+  }
+
+  /**
+   * Store the uploaded files to specific rootDirectory.
+   *
+   * @param rootDirectory directory to store the files.
+   * @return
+   */
+  def uploadFileTo(rootDirectory: File): Directive1[Form] = {
+    Directive[Tuple1[Form]] { inner =>
+      extractMaterializer {implicit mat =>
+        extractExecutionContext {implicit ec =>
+          uploadFileImpl(rootDirectory)(mat, ec) { filesFuture =>
+            ctx => {
+              filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
+            }
+          }
+        }
+      }
+    }
+  }
+
+  // Downloads file from server
+  def downloadFile(file: File): Route = {
+    val responseEntity = HttpEntity(
+      MediaTypes.`application/octet-stream`,
+      file.length,
+      FileIO.fromFile(file, CHUNK_SIZE))
+    complete(responseEntity)
+  }
+
+  private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext)
+    : Directive1[Future[Form]] = {
+    Directive[Tuple1[Future[Form]]] { inner =>
+      entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
+        val form = formdata.parts.mapAsync(1) { p =>
+          if (p.filename.isDefined) {
+
+            // Reserve the suffix
+            val targetPath = File.createTempFile(s"userfile_${p.name}_",
+              s"${p.filename.getOrElse("")}", rootDirectory)
+            val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
+            written.map(written =>
+              if (written.count > 0) {
+                Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count)))
+              } else {
+                Map.empty[Name, FormField]
+              })
+          } else {
+            val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) =>
+              total ++ input
+            }
+            valueFuture.map{value =>
+              Map(p.name -> Right(value.utf8String))
+            }
+          }
+        }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) =>
+          new Form(set.fields ++ value)
+        }
+
+        inner(Tuple1(form))
+      }
+    }
+  }
+}
\ No newline at end of file