You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/05/10 02:42:47 UTC
[37/49] incubator-gearpump git commit: fix GEARPUMP-118 change
package name to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
new file mode 100644
index 0000000..435b83e
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+
+import akka.actor._
+import akka.pattern.ask
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _}
+import org.apache.gearpump.cluster.master.Master._
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
+
+/**
+ * AppManager is dedicated child of Master to manager all applications.
+ */
+private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
+ extends Actor with Stash with TimeOutScheduler {
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ private val executorId: Int = APPMASTER_DEFAULT_EXECUTOR_ID
+ private val appMasterMaxRetries: Int = 5
+ private val appMasterRetryTimeRange: Duration = 20.seconds
+
+ implicit val timeout = FUTURE_TIMEOUT
+ implicit val executionContext = context.dispatcher
+
+ // Next available appId
+ private var appId: Int = 1
+
+ // From appid to appMaster data
+ private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
+
+ // Dead appmaster list
+ private var deadAppMasters = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
+
+ private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
+
+ def receive: Receive = null
+
+ kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
+ context.become(waitForMasterState)
+
+ def waitForMasterState: Receive = {
+ case GetKVSuccess(_, result) =>
+ val masterState = result.asInstanceOf[MasterState]
+ if (masterState != null) {
+ this.appId = masterState.maxId + 1
+ this.deadAppMasters = masterState.deadAppMasters
+ this.appMasterRegistry = masterState.appMasterRegistry
+ }
+ context.become(receiveHandler)
+ unstashAll()
+ case GetKVFailed(ex) =>
+ LOG.error("Failed to get master state, shutting down master to avoid data corruption...")
+ context.parent ! PoisonPill
+ case msg =>
+ LOG.info(s"Get message ${msg.getClass.getSimpleName}")
+ stash()
+ }
+
+ def receiveHandler: Receive = {
+ val msg = "Application Manager started. Ready for application submission..."
+ LOG.info(msg)
+ clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
+ appDataStoreService orElse terminationWatch
+ }
+
+ def clientMsgHandler: Receive = {
+ case SubmitApplication(app, jar, username) =>
+ LOG.info(s"Submit Application ${app.name}($appId) by $username...")
+ val client = sender
+ if (applicationNameExist(app.name)) {
+ client ! SubmitApplicationResult(Failure(
+ new Exception(s"Application name ${app.name} already existed")))
+ } else {
+ context.actorOf(launcher.props(appId, executorId, app, jar, username, context.parent,
+ Some(client)), s"launcher${appId}_${Util.randInt()}")
+
+ val appState = new ApplicationState(appId, app.name, 0, app, jar, username, null)
+ appMasterRestartPolicies += appId ->
+ new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
+ kvService ! PutKV(appId.toString, APP_STATE, appState)
+ appId += 1
+ }
+
+ case RestartApplication(appId) =>
+ val client = sender()
+ (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+ case GetKVSuccess(_, result) =>
+ val appState = result.asInstanceOf[ApplicationState]
+ if (appState != null) {
+ LOG.info(s"Shutting down the application (restart), $appId")
+ self ! ShutdownApplication(appId)
+ self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client)
+ } else {
+ client ! SubmitApplicationResult(Failure(
+ new Exception(s"Failed to restart, because the application $appId does not exist.")
+ ))
+ }
+ case GetKVFailed(ex) =>
+ client ! SubmitApplicationResult(Failure(
+ new Exception(s"Unable to obtain the Master State. " +
+ s"Application $appId will not be restarted.")
+ ))
+ }
+
+ case ShutdownApplication(appId) =>
+ LOG.info(s"App Manager Shutting down application $appId")
+ val (_, info) = appMasterRegistry.getOrElse(appId, (null, null))
+ Option(info) match {
+ case Some(info) =>
+ val worker = info.worker
+ val workerPath = Option(worker).map(_.path).orNull
+ LOG.info(s"Shutdown AppMaster at ${workerPath}, appId: $appId, executorId: $executorId")
+ cleanApplicationData(appId)
+ val shutdown = ShutdownExecutor(appId, executorId,
+ s"AppMaster $appId shutdown requested by master...")
+ sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
+ sender ! ShutdownApplicationResult(Success(appId))
+ case None =>
+ val errorMsg = s"Failed to find regisration information for appId: $appId"
+ LOG.error(errorMsg)
+ sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
+ }
+ case ResolveAppId(appId) =>
+ val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
+ if (null != appMaster) {
+ sender ! ResolveAppIdResult(Success(appMaster))
+ } else {
+ sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId")))
+ }
+ case AppMastersDataRequest =>
+ var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
+ appMasterRegistry.foreach(pair => {
+ val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
+ val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
+ val workerPath = Option(info.worker).map(worker =>
+ ActorUtil.getFullPath(context.system, worker.path))
+ appMastersData += AppMasterData(
+ AppMasterActive, id, info.appName, appMasterPath, workerPath.orNull,
+ info.submissionTime, info.startTime, info.finishTime, info.user)
+ })
+
+ deadAppMasters.foreach(pair => {
+ val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
+ val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
+ val workerPath = Option(info.worker).map(worker =>
+ ActorUtil.getFullPath(context.system, worker.path))
+
+ appMastersData += AppMasterData(
+ AppMasterInActive, id, info.appName, appMasterPath, workerPath.orNull,
+ info.submissionTime, info.startTime, info.finishTime, info.user)
+ })
+
+ sender ! AppMastersData(appMastersData.toList)
+ case QueryAppMasterConfig(appId) =>
+ val config =
+ if (appMasterRegistry.contains(appId)) {
+ val (_, info) = appMasterRegistry(appId)
+ info.config
+ } else if (deadAppMasters.contains(appId)) {
+ val (_, info) = deadAppMasters(appId)
+ info.config
+ } else {
+ null
+ }
+ sender ! AppMasterConfig(config)
+
+ case appMasterDataRequest: AppMasterDataRequest =>
+ val appId = appMasterDataRequest.appId
+ val (appStatus, appMaster, info) =
+ if (appMasterRegistry.contains(appId)) {
+ val (appMaster, info) = appMasterRegistry(appId)
+ (AppMasterActive, appMaster, info)
+ } else if (deadAppMasters.contains(appId)) {
+ val (appMaster, info) = deadAppMasters(appId)
+ (AppMasterInActive, appMaster, info)
+ } else {
+ (AppMasterNonExist, null, null)
+ }
+
+ appStatus match {
+ case AppMasterActive | AppMasterInActive =>
+ val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
+ val workerPath = Option(info.worker).map(
+ worker => ActorUtil.getFullPath(context.system, worker.path)).orNull
+ sender ! AppMasterData(
+ appStatus, appId, info.appName, appMasterPath, workerPath,
+ info.submissionTime, info.startTime, info.finishTime, info.user)
+
+ case AppMasterNonExist =>
+ sender ! AppMasterData(AppMasterNonExist)
+ }
+ }
+
+ def workerMessage: Receive = {
+ case ShutdownExecutorSucceed(appId, executorId) =>
+ LOG.info(s"Shut down executor $executorId for application $appId successfully")
+ case failed: ShutdownExecutorFailed =>
+ LOG.error(failed.reason)
+ }
+
+ private def shutDownExecutorTimeOut(): Unit = {
+ LOG.error(s"Shut down executor time out")
+ }
+
+ def appMasterMessage: Receive = {
+ case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
+ val startTime = System.currentTimeMillis()
+ val register = registerBack.copy(startTime = startTime)
+
+ LOG.info(s"Register AppMaster for app: ${register.appId} $register")
+ context.watch(appMaster)
+ appMasterRegistry += register.appId -> (appMaster, register)
+ kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+ MasterState(appId, appMasterRegistry, deadAppMasters))
+ sender ! AppMasterRegistered(register.appId)
+ }
+
+ def appDataStoreService: Receive = {
+ case SaveAppData(appId, key, value) =>
+ val client = sender
+ (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
+ case PutKVSuccess =>
+ client ! AppDataSaved
+ case PutKVFailed(k, ex) =>
+ client ! SaveAppDataFailed
+ }
+ case GetAppData(appId, key) =>
+ val client = sender
+ (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map {
+ case GetKVSuccess(privateKey, value) =>
+ client ! GetAppDataResult(key, value)
+ case GetKVFailed(ex) =>
+ client ! GetAppDataResult(key, null)
+ }
+ }
+
+ def terminationWatch: Receive = {
+ case terminate: Terminated =>
+ terminate.getAddressTerminated()
+ LOG.info(s"AppMaster(${terminate.actor.path}) is terminiated, " +
+ s"network down: ${terminate.getAddressTerminated()}")
+
+ // Now we assume that the only normal way to stop the application is submitting a
+ // ShutdownApplication request
+ val application = appMasterRegistry.find { appInfo =>
+ val (_, (actorRef, _)) = appInfo
+ actorRef.compareTo(terminate.actor) == 0
+ }
+ if (application.nonEmpty) {
+ val appId = application.get._1
+ (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+ case GetKVSuccess(_, result) =>
+ val appState = result.asInstanceOf[ApplicationState]
+ if (appState != null) {
+ LOG.info(s"Recovering application, $appId")
+ self ! RecoverApplication(appState)
+ } else {
+ LOG.error(s"Cannot find application state for $appId")
+ }
+ case GetKVFailed(ex) =>
+ LOG.error(s"Cannot find master state to recover")
+ }
+ }
+ }
+
+ def selfMsgHandler: Receive = {
+ case RecoverApplication(state) =>
+ val appId = state.appId
+ if (appMasterRestartPolicies.get(appId).get.allowRestart) {
+ LOG.info(s"AppManager Recovering Application $appId...")
+ context.actorOf(launcher.props(appId, executorId, state.app, state.jar, state.username,
+ context.parent, None), s"launcher${appId}_${Util.randInt()}")
+ } else {
+ LOG.error(s"Application $appId failed too many times")
+ }
+ }
+
+ case class RecoverApplication(applicationStatus: ApplicationState)
+
+ private def cleanApplicationData(appId: Int): Unit = {
+ // Add the dead app to dead appMaster
+ appMasterRegistry.get(appId).foreach { pair =>
+ val (appMasterActor, info) = pair
+ deadAppMasters += appId -> (appMasterActor, info.copy(
+ finishTime = System.currentTimeMillis()))
+ }
+
+ appMasterRegistry -= appId
+
+ kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
+ MasterState(this.appId, appMasterRegistry, deadAppMasters))
+ kvService ! DeleteKVGroup(appId.toString)
+ }
+
+ private def applicationNameExist(appName: String): Boolean = {
+ appMasterRegistry.values.exists(_._2.appName == appName)
+ }
+}
+
+object AppManager {
+ final val APP_STATE = "app_state"
+ // The id is used in KVStore
+ final val MASTER_STATE = "master_state"
+
+ case class MasterState(
+ maxId: Int,
+ appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
+ deadAppMasters: Map[Int, (ActorRef, AppMasterRuntimeInfo)])
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
new file mode 100644
index 0000000..3e54214
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.Duration
+
+import akka.actor._
+import akka.cluster.Cluster
+import akka.cluster.ddata.Replicator._
+import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey}
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * A replicated simple in-memory KV service. The replications are stored on all masters.
+ */
+class InMemoryKVService extends Actor with Stash {
+ import org.apache.gearpump.cluster.master.InMemoryKVService._
+
+ private val KV_SERVICE = "gearpump_kvservice"
+
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private val replicator = DistributedData(context.system).replicator
+ private implicit val cluster = Cluster(context.system)
+
+ // Optimize write path, we can tolerate one master down for recovery.
+ private val timeout = Duration(15, TimeUnit.SECONDS)
+ private val readMajority = ReadMajority(timeout)
+ private val writeMajority = WriteMajority(timeout)
+
+ private def groupKey(group: String): LWWMapKey[Any] = {
+ LWWMapKey[Any](KV_SERVICE + "_" + group)
+ }
+
+ def receive: Receive = kvService
+
+ def kvService: Receive = {
+
+ case GetKV(group: String, key: String) =>
+ val request = Request(sender(), key)
+ replicator ! Get(groupKey(group), readMajority, Some(request))
+ case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ val appData = success.get(group)
+ LOG.info(s"Successfully retrived group: ${group.id}")
+ request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
+ case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ LOG.info(s"We cannot find group $group")
+ request.client ! GetKVSuccess(request.key, null)
+ case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ val error = s"Failed to get application data, the request key is ${request.key}"
+ LOG.error(error)
+ request.client ! GetKVFailed(new Exception(error))
+
+ case PutKV(group: String, key: String, value: Any) =>
+ val request = Request(sender(), key)
+ val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map =>
+ map + (key -> value)
+ }
+ replicator ! update
+ case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ request.client ! PutKVSuccess
+ case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) =>
+ request.client ! PutKVFailed(request.key, new Exception(error, cause))
+ case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
+ request.client ! PutKVFailed(request.key, new TimeoutException())
+
+ case delete@DeleteKVGroup(group: String) =>
+ replicator ! Delete(groupKey(group), writeMajority)
+ case DeleteSuccess(group) =>
+ LOG.info(s"KV Group ${group.id} is deleted")
+ case ReplicationDeleteFailure(group) =>
+ LOG.error(s"Failed to delete KV Group ${group.id}...")
+ case DataDeleted(group) =>
+ LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
+ }
+}
+
+object InMemoryKVService {
+ /**
+ * KV Service related
+ */
+ case class GetKV(group: String, key: String)
+
+ trait GetKVResult
+
+ case class GetKVSuccess(key: String, value: Any) extends GetKVResult
+
+ case class GetKVFailed(ex: Throwable) extends GetKVResult
+
+ case class PutKV(group: String, key: String, value: Any)
+
+ case class DeleteKVGroup(group: String)
+
+ case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
+
+ trait PutKVResult
+
+ case object PutKVSuccess extends PutKVResult
+
+ case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
+
+ case class Request(client: ActorRef, key: String)
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
new file mode 100644
index 0000000..1536a23
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.master
+
+import java.lang.management.ManagementFactory
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.JavaConverters._
+import scala.collection.immutable
+
+import akka.actor._
+import akka.remote.DisassociatedEvent
+import com.typesafe.config.Config
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster._
+import org.apache.gearpump.cluster.ClientToMaster._
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.cluster.MasterToAppMaster._
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult}
+import org.apache.gearpump.cluster.MasterToWorker._
+import org.apache.gearpump.cluster.WorkerToMaster._
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.jarstore.local.LocalJarStore
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util._
+
+/**
+ * Master Actor who manages resources of the whole cluster.
+ * It is like the resource manager of YARN.
+ */
+private[cluster] class Master extends Actor with Stash {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private val systemConfig: Config = context.system.settings.config
+ private implicit val timeout = Constants.FUTURE_TIMEOUT
+ private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService")
+ // Resources and resourceRequests can be dynamically constructed by
+ // heartbeat of worker and appmaster when master singleton is migrated.
+ // We don't need to persist them in cluster
+ private var appManager: ActorRef = null
+
+ private var scheduler: ActorRef = null
+
+ private var workers = new immutable.HashMap[ActorRef, WorkerId]
+
+ private val birth = System.currentTimeMillis()
+
+ private var nextWorkerId = 0
+
+ def receive: Receive = null
+
+ // Register jvm metrics
+ Metrics(context.system).register(new JvmMetricsSet(s"master"))
+
+ LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...")
+
+ val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
+
+ private val jarStore = if (Util.isLocalPath(jarStoreRootPath)) {
+ Some(context.actorOf(Props(classOf[LocalJarStore], jarStoreRootPath)))
+ } else {
+ None
+ }
+
+ private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
+
+ // Maintain the list of active masters.
+ private var masters: List[MasterNode] = {
+ // Add myself into the list of initial masters.
+ List(MasterNode(hostPort.host, hostPort.port))
+ }
+
+ val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+
+ val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+ val historyMetricsService = if (metricsEnabled) {
+ val historyMetricsService = {
+ context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig)))
+ }
+
+ val metricsReportService = context.actorOf(
+ Props(new MetricsReporterService(Metrics(context.system))))
+ historyMetricsService.tell(ReportMetrics, metricsReportService)
+ Some(historyMetricsService)
+ } else {
+ None
+ }
+
+ kvService ! GetKV(MASTER_GROUP, WORKER_ID)
+ context.become(waitForNextWorkerId)
+
+ def waitForNextWorkerId: Receive = {
+ case GetKVSuccess(_, result) =>
+ if (result != null) {
+ this.nextWorkerId = result.asInstanceOf[Int]
+ } else {
+ LOG.warn("Cannot find existing state in the distributed cluster...")
+ }
+ context.become(receiveHandler)
+ unstashAll()
+ case GetKVFailed(ex) =>
+ LOG.error("Failed to get worker id, shutting down master to avoid data corruption...")
+ context.parent ! PoisonPill
+ case msg =>
+ LOG.info(s"Get message ${msg.getClass.getSimpleName}")
+ stash()
+ }
+
+ def receiveHandler: Receive = workerMsgHandler orElse
+ appMasterMsgHandler orElse
+ onMasterListChange orElse
+ clientMsgHandler orElse
+ metricsService orElse
+ jarStoreService orElse
+ terminationWatch orElse
+ disassociated orElse
+ kvServiceMsgHandler orElse
+ ActorUtil.defaultMsgHandler(self)
+
+ def workerMsgHandler: Receive = {
+ case RegisterNewWorker =>
+ val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
+ nextWorkerId += 1
+ kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
+ val workerHostname = ActorUtil.getHostname(sender())
+ LOG.info(s"Register new from $workerHostname ....")
+ self forward RegisterWorker(workerId)
+
+ case RegisterWorker(id) =>
+ context.watch(sender())
+ sender ! WorkerRegistered(id, MasterInfo(self, birth))
+ scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
+ workers += (sender() -> id)
+ val workerHostname = ActorUtil.getHostname(sender())
+ LOG.info(s"Register Worker with id $id from $workerHostname ....")
+ case resourceUpdate: ResourceUpdate =>
+ scheduler forward resourceUpdate
+ }
+
+ def jarStoreService: Receive = {
+ case GetJarStoreServer =>
+ jarStore.foreach(_ forward GetJarStoreServer)
+ }
+
+ def kvServiceMsgHandler: Receive = {
+ case PutKVSuccess =>
+ // Skip
+ case PutKVFailed(key, exception) =>
+ LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
+ ExceptionUtils.getStackTrace(exception))
+ }
+
+ def metricsService: Receive = {
+ case query: QueryHistoryMetrics =>
+ if (historyMetricsService.isEmpty) {
+ // Returns empty metrics so that we don't hang the UI
+ sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+ } else {
+ historyMetricsService.get forward query
+ }
+ }
+
+ def appMasterMsgHandler: Receive = {
+ case request: RequestResource =>
+ scheduler forward request
+ case registerAppMaster: RegisterAppMaster =>
+ // Forward to appManager
+ appManager forward registerAppMaster
+ case save: SaveAppData =>
+ appManager forward save
+ case get: GetAppData =>
+ appManager forward get
+ case GetAllWorkers =>
+ sender ! WorkerList(workers.values.toList)
+ case GetMasterData =>
+ val aliveFor = System.currentTimeMillis() - birth
+ val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+ val userDir = System.getProperty("user.dir")
+
+ val masterDescription =
+ MasterSummary(
+ MasterNode(hostPort.host, hostPort.port),
+ masters,
+ aliveFor,
+ logFileDir,
+ jarStoreRootPath,
+ MasterStatus.Synced,
+ userDir,
+ List.empty[MasterActivity],
+ jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+ historyMetricsConfig = getHistoryMetricsConfig
+ )
+
+ sender ! MasterData(masterDescription)
+
+ case invalidAppMaster: InvalidAppMaster =>
+ appManager forward invalidAppMaster
+ }
+
+ import scala.util.{Failure, Success}
+
+ def onMasterListChange: Receive = {
+ case MasterListUpdated(masters: List[MasterNode]) =>
+ this.masters = masters
+ }
+
+ def clientMsgHandler: Receive = {
+ case app: SubmitApplication =>
+ LOG.debug(s"Receive from client, SubmitApplication $app")
+ appManager.forward(app)
+ case app: RestartApplication =>
+ LOG.debug(s"Receive from client, RestartApplication $app")
+ appManager.forward(app)
+ case app: ShutdownApplication =>
+ LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
+ scheduler ! ApplicationFinished(app.appId)
+ appManager.forward(app)
+ case app: ResolveAppId =>
+ LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef")
+ appManager.forward(app)
+ case resolve: ResolveWorkerId =>
+ LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}")
+ val worker = workers.find(_._2 == resolve.workerId)
+ worker match {
+ case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
+ case None => sender ! ResolveWorkerIdResult(Failure(
+ new Exception(s"cannot find worker ${resolve.workerId}")))
+ }
+ case AppMastersDataRequest =>
+ LOG.debug("Master received AppMastersDataRequest")
+ appManager forward AppMastersDataRequest
+ case appMasterDataRequest: AppMasterDataRequest =>
+ LOG.debug("Master received AppMasterDataRequest")
+ appManager forward appMasterDataRequest
+ case query: QueryAppMasterConfig =>
+ LOG.debug("Master received QueryAppMasterConfig")
+ appManager forward query
+ case QueryMasterConfig =>
+ sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+ }
+
+ def disassociated: Receive = {
+ case disassociated: DisassociatedEvent =>
+ LOG.info(s" disassociated ${disassociated.remoteAddress}")
+ }
+
+ def terminationWatch: Receive = {
+ case t: Terminated =>
+ val actor = t.actor
+ LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" +
+ t.getAddressTerminated())
+
+ LOG.info("Let's filter out dead resources...")
+ // Filters out dead worker resource
+ if (workers.keySet.contains(actor)) {
+ scheduler ! WorkerTerminated(workers.get(actor).get)
+ workers -= actor
+ }
+ }
+
+ override def preStart(): Unit = {
+ val path = ActorUtil.getFullPath(context.system, self.path)
+ LOG.info(s"master path is $path")
+ val schedulerClass = Class.forName(
+ systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
+
+ appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)),
+ classOf[AppManager].getSimpleName)
+ scheduler = context.actorOf(Props(schedulerClass))
+ context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
+ }
+}
+
+object Master {
+ final val MASTER_GROUP = "master_group"
+
+ final val WORKER_ID = "next_worker_id"
+
+ case class WorkerTerminated(workerId: WorkerId)
+
+ case class MasterInfo(master: ActorRef, startTime: Long = 0L)
+
+ /** Notify the subscriber that master actor list has been updated */
+ case class MasterListUpdated(masters: List[MasterNode])
+
+ object MasterInfo {
+ def empty: MasterInfo = MasterInfo(null)
+ }
+
+ case class SlotStatus(totalSlots: Int, availableSlots: Int)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
new file mode 100644
index 0000000..1429694
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.scheduler
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.mutable
+
+import akka.actor.ActorRef
+
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.scheduler.Relaxation._
+import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest
+
+/** Assign resource to application based on the priority of the application */
+class PriorityScheduler extends Scheduler {
+ private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
+
+ def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] {
+ override def compare(x: PendingRequest, y: PendingRequest): Int = {
+ var res = x.request.priority.id - y.request.priority.id
+ if (res == 0) {
+ res = y.timeStamp.compareTo(x.timeStamp)
+ }
+ res
+ }
+ }
+
+ override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler
+
+ override def allocateResource(): Unit = {
+ var scheduleLater = Array.empty[PendingRequest]
+ val resourcesSnapShot = resources.clone()
+ var allocated = Resource.empty
+ val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
+
+ while (resourceRequests.nonEmpty && (allocated < totalResource)) {
+ val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue()
+ request.relaxation match {
+ case ANY =>
+ val allocations = allocateFairly(resourcesSnapShot, request)
+ val newAllocated = Resource(allocations.map(_.resource.slots).sum)
+ if (allocations.nonEmpty) {
+ appMaster ! ResourceAllocated(allocations.toArray)
+ }
+ if (newAllocated < request.resource) {
+ val remainingRequest = request.resource - newAllocated
+ val remainingExecutors = request.executorNum - allocations.length
+ val newResourceRequest = request.copy(resource = remainingRequest,
+ executorNum = remainingExecutors)
+ scheduleLater = scheduleLater :+
+ PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
+ }
+ allocated = allocated + newAllocated
+ case ONEWORKER =>
+ val availableResource = resourcesSnapShot.find { params =>
+ val (_, (_, resource)) = params
+ resource > request.resource
+ }
+ if (availableResource.nonEmpty) {
+ val (workerId, (worker, resource)) = availableResource.get
+ allocated = allocated + request.resource
+ appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+ workerId)))
+ resourcesSnapShot.update(workerId, (worker, resource - request.resource))
+ } else {
+ scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
+ }
+ case SPECIFICWORKER =>
+ val workerAndResource = resourcesSnapShot.get(request.workerId)
+ if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
+ val (worker, availableResource) = workerAndResource.get
+ appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
+ request.workerId)))
+ allocated = allocated + request.resource
+ resourcesSnapShot.update(request.workerId, (worker,
+ availableResource - request.resource))
+ } else {
+ scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
+ }
+ }
+ }
+ for (request <- scheduleLater)
+ resourceRequests.enqueue(request)
+ }
+
+ def resourceRequestHandler: Receive = {
+ case RequestResource(appId, request) =>
+ LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " +
+ s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}")
+ val appMaster = sender()
+ resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
+ System.currentTimeMillis()))
+ allocateResource()
+ }
+
+ override def doneApplication(appId: Int): Unit = {
+ resourceRequests = resourceRequests.filter(_.appId != appId)
+ }
+
+ private def allocateFairly(
+ resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest)
+ : List[ResourceAllocation] = {
+ val workerNum = resources.size
+ var allocations = List.empty[ResourceAllocation]
+ var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
+ var remainingRequest = request.resource
+ var remainingExecutors = Math.min(request.executorNum, request.resource.slots)
+
+ while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
+ val exeutorNum = Math.min(workerNum, remainingExecutors)
+ val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
+
+ val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
+ val pickedResources = sortedResources.take(exeutorNum)
+
+ val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex =>
+ val ((workerId, (worker, resource)), index) = workerWithIndex
+ 0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
+ }.sortBy(_._2).map(_._1)
+
+ if (flattenResource.length < toRequest.slots) {
+ // Can not safisfy the user's requirements
+ totalAvailable = Resource.empty
+ } else {
+ flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).
+ toArray.foreach { params =>
+ val ((workerId, worker), slots) = params
+ resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
+ allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
+ }
+ totalAvailable -= toRequest
+ remainingRequest -= toRequest
+ remainingExecutors -= exeutorNum
+ }
+ }
+ allocations
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
new file mode 100644
index 0000000..7187c1a
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.collection.mutable
+
+import akka.actor.{Actor, ActorRef}
+import org.slf4j.Logger
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.WorkerTerminated
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Scheduler schedule resource for different applications.
+ */
+abstract class Scheduler extends Actor {
+ val LOG: Logger = LogUtil.getLogger(getClass)
+ protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
+
+ def handleScheduleMessage: Receive = {
+ case WorkerRegistered(id, _) =>
+ if (!resources.contains(id)) {
+ LOG.info(s"Worker $id added to the scheduler")
+ resources.put(id, (sender, Resource.empty))
+ }
+ case update@ResourceUpdate(worker, workerId, resource) =>
+ LOG.info(s"$update...")
+ if (resources.contains(workerId)) {
+ val resourceReturned = resource > resources.get(workerId).get._2
+ resources.update(workerId, (worker, resource))
+ if (resourceReturned) {
+ allocateResource()
+ }
+ sender ! UpdateResourceSucceed
+ }
+ else {
+ sender ! UpdateResourceFailed(
+ s"ResourceUpdate failed! The worker $workerId has not been registered into master")
+ }
+ case WorkerTerminated(workerId) =>
+ if (resources.contains(workerId)) {
+ resources -= workerId
+ }
+ case ApplicationFinished(appId) =>
+ doneApplication(appId)
+ }
+
+ def allocateResource(): Unit
+
+ def doneApplication(appId: Int): Unit
+}
+
+object Scheduler {
+ case class PendingRequest(
+ appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
+
+ case class ApplicationFinished(appId: Int)
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
new file mode 100644
index 0000000..2f1e23d
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.util.{LogUtil, RichProcess, Util}
+
+/** Launcher to start an executor process */
+class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ override def createProcess(
+ appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
+ classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
+
+ LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}")
+ Util.startProcess(options, classPath, mainClass, arguments)
+ }
+
+ override def cleanProcess(appId: Int, executorId: Int): Unit = {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
new file mode 100644
index 0000000..b290938
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+import java.lang.management.ManagementFactory
+import java.net.URL
+import java.util.concurrent.{Executors, TimeUnit}
+import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.SupervisorStrategy.Stop
+import akka.actor._
+import com.typesafe.config.{ConfigValueFactory, Config, ConfigFactory}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
+import org.apache.gearpump.cluster.MasterToWorker._
+import org.apache.gearpump.cluster.WorkerToAppMaster._
+import org.apache.gearpump.cluster.WorkerToMaster._
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.util.ActorSystemBooter.Daemon
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util.{TimeOutScheduler, _}
+
+/**
+ * Worker is used to track the resource on single machine, it is like
+ * the node manager of YARN.
+ *
+ * @param masterProxy masterProxy is used to resolve the master
+ */
+private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
+ private val systemConfig: Config = context.system.settings.config
+
+ private val address = ActorUtil.getFullPath(context.system, self.path)
+ private var resource = Resource.empty
+ private var allocatedResources = Map[ActorRef, Resource]()
+ private var executorsInfo = Map[ActorRef, ExecutorSlots]()
+ private var id: WorkerId = WorkerId.unspecified
+ private val createdTime = System.currentTimeMillis()
+ private var masterInfo: MasterInfo = null
+ private var executorNameToActor = Map.empty[String, ActorRef]
+ private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
+ private val jarStoreService = JarStoreService.get(systemConfig)
+ jarStoreService.init(systemConfig, context.system)
+
+ private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
+ private val resourceUpdateTimeoutMs = 30000 // Milliseconds
+
+ private var totalSlots: Int = 0
+
+ val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+ var historyMetricsService: Option[ActorRef] = None
+
+ override def receive: Receive = null
+ var LOG: Logger = LogUtil.getLogger(getClass)
+
+ def service: Receive =
+ appMasterMsgHandler orElse
+ clientMessageHandler orElse
+ metricsService orElse
+ terminationWatch(masterInfo.master) orElse
+ ActorUtil.defaultMsgHandler(self)
+
+ def metricsService: Receive = {
+ case query: QueryHistoryMetrics =>
+ if (historyMetricsService.isEmpty) {
+ // Returns empty metrics so that we don't hang the UI
+ sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+ } else {
+ historyMetricsService.get forward query
+ }
+ }
+
+ private var metricsInitialized = false
+
+ val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+
+ private def initializeMetrics(): Unit = {
+ // Registers jvm metrics
+ val metricsSetName = "worker" + WorkerId.render(id)
+ Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
+
+ historyMetricsService = if (metricsEnabled) {
+ val historyMetricsService = {
+ context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
+ }
+
+ val metricsReportService = context.actorOf(Props(
+ new MetricsReporterService(Metrics(context.system))))
+ historyMetricsService.tell(ReportMetrics, metricsReportService)
+ Some(historyMetricsService)
+ } else {
+ None
+ }
+ }
+
+ def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
+
+ // If master get disconnected, the WorkerRegistered may be triggered multiple times.
+ case WorkerRegistered(id, masterInfo) =>
+ this.id = id
+
+ // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
+ // itself.
+ if (!metricsInitialized) {
+ initializeMetrics()
+ metricsInitialized = true
+ }
+
+ this.masterInfo = masterInfo
+ timeoutTicker.cancel()
+ context.watch(masterInfo.master)
+ this.LOG = LogUtil.getLogger(getClass, worker = id)
+ LOG.info(s"Worker is registered. " +
+ s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
+ sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
+ resourceUpdateTimeoutMs, updateResourceTimeOut())
+ context.become(service)
+ }
+
+ private def updateResourceTimeOut(): Unit = {
+ LOG.error(s"Update worker resource time out")
+ }
+
+ def appMasterMsgHandler: Receive = {
+ case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
+ val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+ val executorToStop = executorNameToActor.get(actorName)
+ if (executorToStop.isDefined) {
+ LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
+ s"due to: $reason")
+ executorToStop.get.forward(shutdown)
+ } else {
+ LOG.error(s"Cannot find executor $actorName, ignore this message")
+ sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
+ }
+ case launch: LaunchExecutor =>
+ LOG.info(s"$launch")
+ if (resource < launch.resource) {
+ sender ! ExecutorLaunchRejected("There is no free resource on this machine")
+ } else {
+ val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
+
+ val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
+ jarStoreService, executorProcLauncher))
+ executorNameToActor += actorName -> executor
+
+ resource = resource - launch.resource
+ allocatedResources = allocatedResources + (executor -> launch.resource)
+
+ reportResourceToMaster()
+ executorsInfo += executor ->
+ ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
+ context.watch(executor)
+ }
+ case UpdateResourceFailed(reason, ex) =>
+ LOG.error(reason)
+ context.stop(self)
+ case UpdateResourceSucceed =>
+ LOG.info(s"Update resource succeed")
+ case GetWorkerData(workerId) =>
+ val aliveFor = System.currentTimeMillis() - createdTime
+ val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+ val userDir = System.getProperty("user.dir")
+ sender ! WorkerData(WorkerSummary(
+ id, "active",
+ address,
+ aliveFor,
+ logDir,
+ executorsInfo.values.toArray,
+ totalSlots,
+ resource.slots,
+ userDir,
+ jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+ resourceManagerContainerId = systemConfig.getString(
+ GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
+ historyMetricsConfig = getHistoryMetricsConfig)
+ )
+ case ChangeExecutorResource(appId, executorId, usedResource) =>
+ for (executor <- executorActorRef(appId, executorId);
+ allocatedResource <- allocatedResources.get(executor)) {
+
+ allocatedResources += executor -> usedResource
+ resource = resource + allocatedResource - usedResource
+ reportResourceToMaster()
+
+ if (usedResource == Resource(0)) {
+ allocatedResources -= executor
+ // stop executor if there is no resource binded to it.
+ LOG.info(s"Shutdown executor $executorId because the resource used is zero")
+ executor ! ShutdownExecutor(appId, executorId,
+ "Shutdown executor because the resource used is zero")
+ }
+ }
+ }
+
+ private def reportResourceToMaster(): Unit = {
+ sendMsgWithTimeOutCallBack(masterInfo.master,
+ ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
+ }
+
+ private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
+ val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+ executorNameToActor.get(actorName)
+ }
+
+ def clientMessageHandler: Receive = {
+ case QueryWorkerConfig(workerId) =>
+ if (this.id == workerId) {
+ sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+ } else {
+ sender ! WorkerConfig(ConfigFactory.empty)
+ }
+ }
+
+ private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
+ repeatActionUtil(
+ seconds = timeOutSeconds,
+ action = () => {
+ masterProxy ! RegisterWorker(workerId)
+ },
+ onTimeout = () => {
+ LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
+ s"seconds, abort and kill the worker...")
+ self ! PoisonPill
+ })
+ }
+
+ def terminationWatch(master: ActorRef): Receive = {
+ case Terminated(actor) =>
+ if (actor.compareTo(master) == 0) {
+ // Parent master is down, no point to keep worker anymore. Let's make suicide to free
+ // resources
+ LOG.info(s"Master cannot be contacted, find a new master ...")
+ context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
+ } else if (ActorUtil.isChildActorPath(self, actor)) {
+ // One executor is down,
+ LOG.info(s"Executor is down ${getExecutorName(actor)}")
+
+ val allocated = allocatedResources.get(actor)
+ if (allocated.isDefined) {
+ resource = resource + allocated.get
+ executorsInfo -= actor
+ allocatedResources = allocatedResources - actor
+ sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
+ resourceUpdateTimeoutMs, updateResourceTimeOut())
+ }
+ }
+ }
+
+ private def getExecutorName(actorRef: ActorRef): Option[String] = {
+ executorNameToActor.find(_._2 == actorRef).map(_._1)
+ }
+
+ private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
+ val launcherClazz = Class.forName(
+ systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
+ launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
+ .asInstanceOf[ExecutorProcessLauncher]
+ }
+
+ import context.dispatcher
+ override def preStart(): Unit = {
+ LOG.info(s"RegisterNewWorker")
+ totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
+ this.resource = Resource(totalSlots)
+ masterProxy ! RegisterNewWorker
+ context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
+ }
+
+ private def registerTimeoutTicker(seconds: Int): Cancellable = {
+ repeatActionUtil(seconds, () => Unit, () => {
+ LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
+ s"abort and kill the worker...")
+ self ! PoisonPill
+ })
+ }
+
+ private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
+ : Cancellable = {
+ val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
+ Duration(2, TimeUnit.SECONDS))(action())
+ val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
+ new Cancellable {
+ def cancel(): Boolean = {
+ val result1 = cancelTimeout.cancel()
+ val result2 = cancelSuicide.cancel()
+ result1 && result2
+ }
+
+ def isCancelled: Boolean = {
+ cancelTimeout.isCancelled && cancelSuicide.isCancelled
+ }
+ }
+ }
+
+ override def postStop(): Unit = {
+ LOG.info(s"Worker is going down....")
+ ioPool.shutdown()
+ context.system.terminate()
+ }
+}
+
+private[cluster] object Worker {
+
+ case class ExecutorResult(result: Try[Int])
+
+ class ExecutorWatcher(
+ launch: LaunchExecutor,
+ masterInfo: MasterInfo,
+ ioPool: ExecutionContext,
+ jarStoreService: JarStoreService,
+ procLauncher: ExecutorProcessLauncher) extends Actor {
+ import launch.{appId, executorId, resource}
+
+ private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
+
+ val executorConfig: Config = {
+ val workerConfig = context.system.settings.config
+
+ val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
+ Option(jvmConfig.executorAkkaConfig)
+ }.getOrElse(ConfigFactory.empty())
+
+ resolveExecutorConfig(workerConfig, submissionConfig)
+ }
+
+ // For some config, worker has priority, for others, user Application submission config
+ // have priorities.
+ private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
+ val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
+ .withoutPath(GEARPUMP_CLUSTER_MASTERS)
+ .withoutPath(GEARPUMP_HOME)
+ .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
+ .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
+ // Falls back to workerConfig
+ .withFallback(workerConfig)
+
+ // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
+ val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
+ val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
+ LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
+ config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
+ } else {
+ config
+ }
+
+ // Excludes reference.conf, and JVM properties..
+ ClusterConfig.filterOutDefaultConfig(updatedConf)
+ }
+
+ implicit val executorService = ioPool
+
+ private val executorHandler = {
+ val ctx = launch.executorJvmConfig
+
+ if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
+ new ExecutorHandler {
+ val exitPromise = Promise[Int]()
+ val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
+
+ override def destroy(): Unit = {
+ context.stop(app)
+ }
+ override def exitValue: Future[Int] = {
+ exitPromise.future
+ }
+ }
+ } else {
+ createProcess(ctx)
+ }
+ }
+
+ private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
+
+ val process = Future {
+ val jarPath = ctx.jar.map { appJar =>
+ val tempFile = File.createTempFile(appJar.name, ".jar")
+ jarStoreService.copyToLocalFile(tempFile, appJar.filePath)
+ val file = new URL("file:" + tempFile)
+ file.getFile
+ }
+
+ val configFile = {
+ val configFile = File.createTempFile("gearpump", ".conf")
+ ClusterConfig.saveConfig(executorConfig, configFile)
+ val file = new URL("file:" + configFile)
+ file.getFile
+ }
+
+ val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
+ ctx.classPath.map(path => expandEnviroment(path)) ++
+ jarPath.map(Array(_)).getOrElse(Array.empty[String])
+
+ val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
+ val logArgs = List(
+ s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
+ s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
+ s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
+ s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
+ val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
+
+ val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
+
+ // Remote debug executor process
+ val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
+ val remoteDebugConfig = if (remoteDebugFlag) {
+ val availablePort = Util.findFreePort().get
+ List(
+ "-Xdebug",
+ s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
+ s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
+ )
+ } else {
+ List.empty[String]
+ }
+
+ val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
+ val verboseGCConfig = if (verboseGCFlag) {
+ List(
+ s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
+ "-verbose:gc",
+ "-XX:+PrintGCDetails",
+ "-XX:+PrintGCDateStamps",
+ "-XX:+PrintTenuringDistribution",
+ "-XX:+PrintGCApplicationConcurrentTime",
+ "-XX:+PrintGCApplicationStoppedTime"
+ )
+ } else {
+ List.empty[String]
+ }
+
+ val ipv4 = List(s"-D${PREFER_IPV4}=true")
+
+ val options = ctx.jvmArguments ++ username ++
+ logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
+
+ LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}")
+ val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
+ options, classPath, ctx.mainClass, ctx.arguments)
+
+ ProcessInfo(process, jarPath, configFile)
+ }
+
+ new ExecutorHandler {
+
+ var destroyed = false
+
+ override def destroy(): Unit = {
+ LOG.info(s"Destroy executor process ${ctx.mainClass}")
+ if (!destroyed) {
+ destroyed = true
+ process.foreach { info =>
+ info.process.destroy()
+ info.jarPath.foreach(new File(_).delete())
+ new File(info.configFile).delete()
+ }
+ }
+ }
+
+ override def exitValue: Future[Int] = {
+ process.flatMap { info =>
+ val exit = info.process.exitValue()
+ if (exit == 0) {
+ Future.successful(0)
+ } else {
+ Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
+ s"error summary: ${info.process.logger.error}"))
+ }
+ }
+ }
+ }
+ }
+
+ private def expandEnviroment(path: String): String = {
+ // TODO: extend this to support more environment.
+ path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
+ }
+
+ override def preStart(): Unit = {
+ executorHandler.exitValue.onComplete { value =>
+ procLauncher.cleanProcess(appId, executorId)
+ val result = ExecutorResult(value)
+ self ! result
+ }
+ }
+
+ override def postStop(): Unit = {
+ executorHandler.destroy()
+ }
+
+ // The folders are under ${GEARPUMP_HOME}
+ val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" +
+ File.separator + "yarn")
+
+ override def receive: Receive = {
+ case ShutdownExecutor(appId, executorId, reason: String) =>
+ executorHandler.destroy()
+ sender ! ShutdownExecutorSucceed(appId, executorId)
+ context.stop(self)
+ case ExecutorResult(executorResult) =>
+ executorResult match {
+ case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
+ case Failure(e) => LOG.error("Executor exit with errors", e)
+ }
+ context.stop(self)
+ }
+
+ private def getFormatedTime(timestamp: Long): String = {
+ val datePattern = "yyyy-MM-dd-HH-mm"
+ val format = new java.text.SimpleDateFormat(datePattern)
+ format.format(timestamp)
+ }
+
+ private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
+ classPath.filterNot(matchDaemonPattern(_))
+ }
+
+ private def matchDaemonPattern(path: String): Boolean = {
+ daemonPathPattern.exists(path.contains(_))
+ }
+ }
+
+ trait ExecutorHandler {
+ def destroy(): Unit
+ def exitValue: Future[Int]
+ }
+
+ case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
+
+ /**
+ * Starts the executor in the same JVM as worker.
+ */
+ class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
+ extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
+ private val exitCode = 0
+
+ override val supervisorStrategy =
+ OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
+ case ex: Throwable =>
+ LOG.error(s"system $name stopped ", ex)
+ exit.failure(ex)
+ Stop
+ }
+
+ override def postStop(): Unit = {
+ if (!exit.isCompleted) {
+ exit.success(exitCode)
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
new file mode 100644
index 0000000..d34f5c2
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/jarstore/dfs/DFSJarStoreService.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore.dfs
+
+import java.io.File
+
+import akka.actor.ActorSystem
+import com.typesafe.config.Config
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
+import org.slf4j.Logger
+
+import org.apache.gearpump.jarstore.{FilePath, JarStoreService}
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+/**
+ * DFSJarStoreService store the uploaded jar on HDFS
+ */
+class DFSJarStoreService extends JarStoreService {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+ private var rootPath: Path = null
+
+ override val scheme: String = "hdfs"
+
+ override def init(config: Config, actorRefFactory: ActorSystem): Unit = {
+ rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH))
+ val fs = rootPath.getFileSystem(new Configuration())
+ if (!fs.exists(rootPath)) {
+ fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ }
+ }
+
+ /**
+ * This function will copy the remote file to local file system, called from client side.
+ *
+ * @param localFile The destination of file path
+ * @param remotePath The remote file path from JarStore
+ */
+ override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
+ LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from ${remotePath}")
+ val filePath = new Path(rootPath, remotePath.path)
+ val fs = filePath.getFileSystem(new Configuration())
+ val target = new Path(localFile.toURI().toString)
+ fs.copyToLocalFile(filePath, target)
+ }
+
+ /**
+ * This function will copy the local file to the remote JarStore, called from client side.
+ *
+ * @param localFile The local file
+ */
+ override def copyFromLocal(localFile: File): FilePath = {
+ val remotePath = FilePath(Math.abs(new java.util.Random().nextLong()).toString)
+ LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to ${remotePath}")
+ val filePath = new Path(rootPath, remotePath.path)
+ val fs = filePath.getFileSystem(new Configuration())
+ fs.copyFromLocalFile(new Path(localFile.toURI.toString), filePath)
+ remotePath
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
new file mode 100644
index 0000000..9bd7071
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStore.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.jarstore.local
+
+import java.io.File
+
+import akka.actor.{Actor, Stash}
+import akka.pattern.pipe
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
+import org.apache.gearpump.util._
+
+/**
+ * LocalJarStore store the uploaded jar on local disk.
+ */
+class LocalJarStore(rootDirPath: String) extends Actor with Stash {
+ private val LOG: Logger = LogUtil.getLogger(getClass)
+
+ val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME)
+ val rootDirectory = new File(rootDirPath)
+
+ FileUtils.forceMkdir(rootDirectory)
+
+ val server = new FileServer(context.system, host, 0, rootDirectory)
+
+ implicit val timeout = Constants.FUTURE_TIMEOUT
+ implicit val executionContext = context.dispatcher
+
+ server.start pipeTo self
+
+ def receive: Receive = {
+ case FileServer.Port(port) =>
+ context.become(listen(port))
+ unstashAll()
+ case _ =>
+ stash()
+ }
+
+ def listen(port: Int): Receive = {
+ case GetJarStoreServer =>
+ sender ! JarStoreServerAddress(s"http://$host:$port/")
+ }
+
+ override def postStop(): Unit = {
+ server.stop
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
new file mode 100644
index 0000000..1ab103f
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/jarstore/local/LocalJarStoreService.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.jarstore.local
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.pattern.ask
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress}
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.jarstore.{FilePath, JarStoreService}
+import org.apache.gearpump.util._
+
+/**
+ * LocalJarStoreService store the uploaded jar on local disk.
+ */
+class LocalJarStoreService extends JarStoreService {
+ private def LOG: Logger = LogUtil.getLogger(getClass)
+ private implicit val timeout = Constants.FUTURE_TIMEOUT
+ private var system: akka.actor.ActorSystem = null
+ private var master: ActorRef = null
+ private implicit def dispatcher: ExecutionContext = system.dispatcher
+
+ override val scheme: String = "file"
+
+ override def init(config: Config, system: ActorSystem): Unit = {
+ this.system = system
+ val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
+ .asScala.flatMap(Util.parseHostList)
+ master = system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}")
+ }
+
+ private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]]
+ .map { address =>
+ val client = new FileServer.Client(system, address.url)
+ client
+ }
+
+ /**
+ * This function will copy the remote file to local file system, called from client side.
+ *
+ * @param localFile The destination of file path
+ * @param remotePath The remote file path from JarStore
+ */
+ override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = {
+ LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath")
+ val future = client.flatMap(_.download(remotePath, localFile))
+ Await.ready(future, Duration(60, TimeUnit.SECONDS))
+ }
+
+ /**
+ * This function will copy the local file to the remote JarStore, called from client side.
+ * @param localFile The local file
+ */
+ override def copyFromLocal(localFile: File): FilePath = {
+ val future = client.flatMap(_.upload(localFile))
+ Await.result(future, Duration(60, TimeUnit.SECONDS))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala b/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
new file mode 100644
index 0000000..66bb9ba
--- /dev/null
+++ b/daemon/src/main/scala/org/apache/gearpump/util/FileDirective.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.io.File
+import scala.concurrent.{ExecutionContext, Future}
+
+import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._
+import akka.stream.Materializer
+import akka.stream.scaladsl.FileIO
+import akka.util.ByteString
+
+/**
+ * FileDirective is a set of Akka-http directive to upload/download
+ * huge binary files to/from Akka-Http server.
+ */
+object FileDirective {
+
+ // Form field name
+ type Name = String
+
+ val CHUNK_SIZE = 262144
+
+ /**
+ * File information after a file is uploaded to server.
+ *
+ * @param originFileName original file name when user upload it in browser.
+ * @param file file name after the file is saved to server.
+ * @param length the length of the file
+ */
+ case class FileInfo(originFileName: String, file: File, length: Long)
+
+ class Form(val fields: Map[Name, FormField]) {
+ def getFile(fieldName: String): Option[FileInfo] = {
+ fields.get(fieldName).flatMap {
+ case Left(file) => Option(file)
+ case Right(_) => None
+ }
+ }
+
+ def getValue(fieldName: String): Option[String] = {
+ fields.get(fieldName).flatMap {
+ case Left(_) => None
+ case Right(value) => Option(value)
+ }
+ }
+ }
+
+ type FormField = Either[FileInfo, String]
+
+ /**
+ * directive to uploadFile, it store the uploaded files
+ * to temporary directory, and return a Map from form field name
+ * to FileInfo.
+ */
+ def uploadFile: Directive1[Form] = {
+ uploadFileTo(null)
+ }
+
+ /**
+ * Store the uploaded files to specific rootDirectory.
+ *
+ * @param rootDirectory directory to store the files.
+ * @return
+ */
+ def uploadFileTo(rootDirectory: File): Directive1[Form] = {
+ Directive[Tuple1[Form]] { inner =>
+ extractMaterializer {implicit mat =>
+ extractExecutionContext {implicit ec =>
+ uploadFileImpl(rootDirectory)(mat, ec) { filesFuture =>
+ ctx => {
+ filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx))
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Downloads file from server
+ def downloadFile(file: File): Route = {
+ val responseEntity = HttpEntity(
+ MediaTypes.`application/octet-stream`,
+ file.length,
+ FileIO.fromFile(file, CHUNK_SIZE))
+ complete(responseEntity)
+ }
+
+ private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext)
+ : Directive1[Future[Form]] = {
+ Directive[Tuple1[Future[Form]]] { inner =>
+ entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) =>
+ val form = formdata.parts.mapAsync(1) { p =>
+ if (p.filename.isDefined) {
+
+ // Reserve the suffix
+ val targetPath = File.createTempFile(s"userfile_${p.name}_",
+ s"${p.filename.getOrElse("")}", rootDirectory)
+ val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath))
+ written.map(written =>
+ if (written.count > 0) {
+ Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count)))
+ } else {
+ Map.empty[Name, FormField]
+ })
+ } else {
+ val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) =>
+ total ++ input
+ }
+ valueFuture.map{value =>
+ Map(p.name -> Right(value.utf8String))
+ }
+ }
+ }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) =>
+ new Form(set.fields ++ value)
+ }
+
+ inner(Tuple1(form))
+ }
+ }
+ }
+}
\ No newline at end of file