You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/14 11:55:23 UTC
[2/4] incubator-gearpump git commit: [GEARPUMP-224] merge
gearpump-daemon to gearpump-core
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
deleted file mode 100644
index 9a3a119..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.util.{Failure, Success}
-
-import akka.actor._
-import akka.pattern.ask
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
-import org.apache.gearpump.cluster.AppMasterToWorker._
-import org.apache.gearpump.cluster.ClientToMaster._
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _}
-import org.apache.gearpump.cluster.MasterToClient._
-import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _}
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
-import org.apache.gearpump.cluster.master.AppManager._
-import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _}
-import org.apache.gearpump.cluster.master.Master._
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _}
-
-/**
- * AppManager is dedicated child of Master to manager all applications.
- */
-private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory)
- extends Actor with Stash with TimeOutScheduler {
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID
- private val appMasterMaxRetries: Int = 5
- private val appMasterRetryTimeRange: Duration = 20.seconds
-
- implicit val timeout = FUTURE_TIMEOUT
- implicit val executionContext = context.dispatcher
-
- // Next available appId
- private var nextAppId: Int = 1
-
- // From appId to appMaster data
- // Applications not in activeAppMasters or deadAppMasters are in pending status
- private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)]
-
- // Active appMaster list where applications are in active status
- private var activeAppMasters = Set.empty[Int]
-
- // Dead appMaster list where applications are in inactive status
- private var deadAppMasters = Set.empty[Int]
-
- private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
-
- def receive: Receive = null
-
- kvService ! GetKV(MASTER_GROUP, MASTER_STATE)
- context.become(waitForMasterState)
-
- def waitForMasterState: Receive = {
- case GetKVSuccess(_, result) =>
- val masterState = result.asInstanceOf[MasterState]
- if (masterState != null) {
- this.nextAppId = masterState.maxId + 1
- this.activeAppMasters = masterState.activeAppMasters
- this.deadAppMasters = masterState.deadAppMasters
- this.appMasterRegistry = masterState.appMasterRegistry
- }
- context.become(receiveHandler)
- unstashAll()
- case GetKVFailed(ex) =>
- LOG.error("Failed to get master state, shutting down master to avoid data corruption...")
- context.parent ! PoisonPill
- case msg =>
- LOG.info(s"Get message ${msg.getClass.getSimpleName}")
- stash()
- }
-
- def receiveHandler: Receive = {
- val msg = "Application Manager started. Ready for application submission..."
- LOG.info(msg)
- clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse
- appDataStoreService orElse terminationWatch
- }
-
- def clientMsgHandler: Receive = {
- case SubmitApplication(app, jar, username) =>
- LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...")
- val client = sender()
- if (applicationNameExist(app.name)) {
- client ! SubmitApplicationResult(Failure(
- new Exception(s"Application name ${app.name} already existed")))
- } else {
- context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, username, context.parent,
- Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
-
- val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, username, null)
- appMasterRestartPolicies += nextAppId ->
- new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
- kvService ! PutKV(nextAppId.toString, APP_STATE, appState)
- nextAppId += 1
- }
-
- case RestartApplication(appId) =>
- val client = sender()
- (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
- case GetKVSuccess(_, result) =>
- val appState = result.asInstanceOf[ApplicationState]
- if (appState != null) {
- LOG.info(s"Shutting down the application (restart), $appId")
- self ! ShutdownApplication(appId)
- self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client)
- } else {
- client ! SubmitApplicationResult(Failure(
- new Exception(s"Failed to restart, because the application $appId does not exist.")
- ))
- }
- case GetKVFailed(ex) =>
- client ! SubmitApplicationResult(Failure(
- new Exception(s"Unable to obtain the Master State. " +
- s"Application $appId will not be restarted.")
- ))
- }
-
- case ShutdownApplication(appId) =>
- LOG.info(s"App Manager Shutting down application $appId")
- val (_, appInfo) = appMasterRegistry.get(appId)
- .filter { case (_, info) => !deadAppMasters.contains(info.appId)}
- .getOrElse((null, null))
- Option(appInfo) match {
- case Some(info) =>
- val worker = info.worker
- val workerPath = Option(worker).map(_.path).orNull
- LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId: $EXECUTOR_ID")
- cleanApplicationData(appId)
- val shutdown = ShutdownExecutor(appId, EXECUTOR_ID,
- s"AppMaster $appId shutdown requested by master...")
- sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut())
- sender ! ShutdownApplicationResult(Success(appId))
- case None =>
- val errorMsg = s"Failed to find registration information for appId: $appId"
- LOG.error(errorMsg)
- sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg)))
- }
-
- case ResolveAppId(appId) =>
- val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
- if (null != appMaster) {
- sender ! ResolveAppIdResult(Success(appMaster))
- } else {
- sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId")))
- }
-
- case AppMastersDataRequest =>
- var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
- appMasterRegistry.foreach(pair => {
- val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
- val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
- val workerPath = Option(info.worker).map(worker =>
- ActorUtil.getFullPath(context.system, worker.path))
- val status = getAppMasterStatus(id)
- appMastersData += AppMasterData(
- status, id, info.appName, appMasterPath, workerPath.orNull,
- info.submissionTime, info.startTime, info.finishTime, info.user)
- })
-
- sender ! AppMastersData(appMastersData.toList)
-
- case QueryAppMasterConfig(appId) =>
- val config =
- if (appMasterRegistry.contains(appId)) {
- val (_, info) = appMasterRegistry(appId)
- info.config
- } else {
- null
- }
- sender ! AppMasterConfig(config)
-
- case appMasterDataRequest: AppMasterDataRequest =>
- val appId = appMasterDataRequest.appId
- val appStatus = getAppMasterStatus(appId)
-
- appStatus match {
- case AppMasterNonExist =>
- sender ! AppMasterData(AppMasterNonExist)
- case _ =>
- val (appMaster, info) = appMasterRegistry(appId)
- val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path)
- val workerPath = Option(info.worker).map(
- worker => ActorUtil.getFullPath(context.system, worker.path)).orNull
- sender ! AppMasterData(
- appStatus, appId, info.appName, appMasterPath, workerPath,
- info.submissionTime, info.startTime, info.finishTime, info.user)
- }
- }
-
- def workerMessage: Receive = {
- case ShutdownExecutorSucceed(appId, executorId) =>
- LOG.info(s"Shut down executor $executorId for application $appId successfully")
- case failed: ShutdownExecutorFailed =>
- LOG.error(failed.reason)
- }
-
- private def getAppMasterStatus(appId: Int): AppMasterStatus = {
- if (activeAppMasters.contains(appId)) {
- AppMasterActive
- } else if (deadAppMasters.contains(appId)) {
- AppMasterInActive
- } else if (appMasterRegistry.contains(appId)) {
- AppMasterPending
- } else {
- AppMasterNonExist
- }
- }
-
- private def shutDownExecutorTimeOut(): Unit = {
- LOG.error(s"Shut down executor time out")
- }
-
- def appMasterMessage: Receive = {
- case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
- val startTime = System.currentTimeMillis()
- val register = registerBack.copy(startTime = startTime)
-
- LOG.info(s"Register AppMaster for app: ${register.appId}, $register")
- context.watch(appMaster)
- appMasterRegistry += register.appId -> (appMaster, register)
- kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
- MasterState(nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
- sender ! AppMasterRegistered(register.appId)
-
- case ActivateAppMaster(appId) =>
- LOG.info(s"Activate AppMaster for app $appId")
- activeAppMasters += appId
- kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
- MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
- sender ! AppMasterActivated(appId)
- }
-
- def appDataStoreService: Receive = {
- case SaveAppData(appId, key, value) =>
- val client = sender()
- (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map {
- case PutKVSuccess =>
- client ! AppDataSaved
- case PutKVFailed(k, ex) =>
- client ! SaveAppDataFailed
- }
- case GetAppData(appId, key) =>
- val client = sender()
- (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map {
- case GetKVSuccess(privateKey, value) =>
- client ! GetAppDataResult(key, value)
- case GetKVFailed(ex) =>
- client ! GetAppDataResult(key, null)
- }
- }
-
- def terminationWatch: Receive = {
- case terminate: Terminated =>
- LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " +
- s"network down: ${terminate.getAddressTerminated}")
-
- // Now we assume that the only normal way to stop the application is submitting a
- // ShutdownApplication request
- val application = appMasterRegistry.find { appInfo =>
- val (_, (actorRef, _)) = appInfo
- actorRef.compareTo(terminate.actor) == 0
- }
- if (application.nonEmpty) {
- val appId = application.get._1
- (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
- case GetKVSuccess(_, result) =>
- val appState = result.asInstanceOf[ApplicationState]
- if (appState != null) {
- LOG.info(s"Recovering application, $appId")
- self ! RecoverApplication(appState)
- } else {
- LOG.error(s"Cannot find application state for $appId")
- }
- case GetKVFailed(ex) =>
- LOG.error(s"Cannot find master state to recover")
- }
- }
- }
-
- def selfMsgHandler: Receive = {
- case RecoverApplication(state) =>
- val appId = state.appId
- if (appMasterRestartPolicies.get(appId).get.allowRestart) {
- LOG.info(s"AppManager Recovering Application $appId...")
- activeAppMasters -= appId
- kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
- MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
- context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, state.jar, state.username,
- context.parent, None), s"launcher${appId}_${Util.randInt()}")
- } else {
- LOG.error(s"Application $appId failed too many times")
- }
- }
-
- case class RecoverApplication(applicationStatus: ApplicationState)
-
- private def cleanApplicationData(appId: Int): Unit = {
- if (appMasterRegistry.contains(appId)) {
- // Add the dead app to dead appMasters
- deadAppMasters += appId
- // Remove the dead app from active appMasters
- activeAppMasters -= appId
-
- appMasterRegistry += appId -> {
- val (ref, info) = appMasterRegistry(appId)
- (ref, info.copy(finishTime = System.currentTimeMillis()))
- }
- kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
- MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters))
- kvService ! DeleteKVGroup(appId.toString)
- }
- }
-
- private def applicationNameExist(appName: String): Boolean = {
- appMasterRegistry.values.exists { case (_, info) =>
- info.appName == appName && !deadAppMasters.contains(info.appId)
- }
- }
-}
-
-object AppManager {
- final val APP_STATE = "app_state"
- // The id is used in KVStore
- final val MASTER_STATE = "master_state"
-
- case class MasterState(
- maxId: Int,
- appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
- activeAppMasters: Set[Int],
- deadAppMasters: Set[Int])
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
deleted file mode 100644
index 3e54214..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.TimeoutException
-import scala.concurrent.duration.Duration
-
-import akka.actor._
-import akka.cluster.Cluster
-import akka.cluster.ddata.Replicator._
-import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey}
-import org.slf4j.Logger
-
-import org.apache.gearpump.util.LogUtil
-
-/**
- * A replicated simple in-memory KV service. The replications are stored on all masters.
- */
-class InMemoryKVService extends Actor with Stash {
- import org.apache.gearpump.cluster.master.InMemoryKVService._
-
- private val KV_SERVICE = "gearpump_kvservice"
-
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private val replicator = DistributedData(context.system).replicator
- private implicit val cluster = Cluster(context.system)
-
- // Optimize write path, we can tolerate one master down for recovery.
- private val timeout = Duration(15, TimeUnit.SECONDS)
- private val readMajority = ReadMajority(timeout)
- private val writeMajority = WriteMajority(timeout)
-
- private def groupKey(group: String): LWWMapKey[Any] = {
- LWWMapKey[Any](KV_SERVICE + "_" + group)
- }
-
- def receive: Receive = kvService
-
- def kvService: Receive = {
-
- case GetKV(group: String, key: String) =>
- val request = Request(sender(), key)
- replicator ! Get(groupKey(group), readMajority, Some(request))
- case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
- val appData = success.get(group)
- LOG.info(s"Successfully retrived group: ${group.id}")
- request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull)
- case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
- LOG.info(s"We cannot find group $group")
- request.client ! GetKVSuccess(request.key, null)
- case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
- val error = s"Failed to get application data, the request key is ${request.key}"
- LOG.error(error)
- request.client ! GetKVFailed(new Exception(error))
-
- case PutKV(group: String, key: String, value: Any) =>
- val request = Request(sender(), key)
- val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map =>
- map + (key -> value)
- }
- replicator ! update
- case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
- request.client ! PutKVSuccess
- case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) =>
- request.client ! PutKVFailed(request.key, new Exception(error, cause))
- case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) =>
- request.client ! PutKVFailed(request.key, new TimeoutException())
-
- case delete@DeleteKVGroup(group: String) =>
- replicator ! Delete(groupKey(group), writeMajority)
- case DeleteSuccess(group) =>
- LOG.info(s"KV Group ${group.id} is deleted")
- case ReplicationDeleteFailure(group) =>
- LOG.error(s"Failed to delete KV Group ${group.id}...")
- case DataDeleted(group) =>
- LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...")
- }
-}
-
-object InMemoryKVService {
- /**
- * KV Service related
- */
- case class GetKV(group: String, key: String)
-
- trait GetKVResult
-
- case class GetKVSuccess(key: String, value: Any) extends GetKVResult
-
- case class GetKVFailed(ex: Throwable) extends GetKVResult
-
- case class PutKV(group: String, key: String, value: Any)
-
- case class DeleteKVGroup(group: String)
-
- case class GroupDeleted(group: String) extends GetKVResult with PutKVResult
-
- trait PutKVResult
-
- case object PutKVSuccess extends PutKVResult
-
- case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult
-
- case class Request(client: ActorRef, key: String)
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
deleted file mode 100644
index 6b4df07..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import java.lang.management.ManagementFactory
-import org.apache.gearpump.cluster.worker.WorkerId
-import org.apache.gearpump.jarstore.JarStoreServer
-
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-
-import akka.actor._
-import akka.remote.DisassociatedEvent
-import com.typesafe.config.Config
-import org.apache.commons.lang.exception.ExceptionUtils
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.AppMasterToMaster._
-import org.apache.gearpump.cluster.ClientToMaster._
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.MasterToAppMaster._
-import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult}
-import org.apache.gearpump.cluster.MasterToWorker._
-import org.apache.gearpump.cluster.WorkerToMaster._
-import org.apache.gearpump.cluster.master.InMemoryKVService._
-import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _}
-import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import org.apache.gearpump.metrics.Metrics.ReportMetrics
-import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import org.apache.gearpump.util._
-
-/**
- * Master Actor who manages resources of the whole cluster.
- * It is like the resource manager of YARN.
- */
-private[cluster] class Master extends Actor with Stash {
- private val LOG: Logger = LogUtil.getLogger(getClass)
- private val systemConfig: Config = context.system.settings.config
- private implicit val timeout = Constants.FUTURE_TIMEOUT
- private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService")
- // Resources and resourceRequests can be dynamically constructed by
- // heartbeat of worker and appmaster when master singleton is migrated.
- // We don't need to persist them in cluster
- private var appManager: ActorRef = null
-
- private var scheduler: ActorRef = null
-
- private var workers = new immutable.HashMap[ActorRef, WorkerId]
-
- private val birth = System.currentTimeMillis()
-
- private var nextWorkerId = 0
-
- def receive: Receive = null
-
- // Register jvm metrics
- Metrics(context.system).register(new JvmMetricsSet(s"master"))
-
- LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...")
-
- val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)
-
- private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath))
-
- private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort)
-
- // Maintain the list of active masters.
- private var masters: List[MasterNode] = {
- // Add myself into the list of initial masters.
- List(MasterNode(hostPort.host, hostPort.port))
- }
-
- val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
-
- val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
- val historyMetricsService = if (metricsEnabled) {
- val historyMetricsService = {
- context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig)))
- }
-
- val metricsReportService = context.actorOf(
- Props(new MetricsReporterService(Metrics(context.system))))
- historyMetricsService.tell(ReportMetrics, metricsReportService)
- Some(historyMetricsService)
- } else {
- None
- }
-
- kvService ! GetKV(MASTER_GROUP, WORKER_ID)
- context.become(waitForNextWorkerId)
-
- def waitForNextWorkerId: Receive = {
- case GetKVSuccess(_, result) =>
- if (result != null) {
- this.nextWorkerId = result.asInstanceOf[Int]
- } else {
- LOG.warn("Cannot find existing state in the distributed cluster...")
- }
- context.become(receiveHandler)
- unstashAll()
- case GetKVFailed(ex) =>
- LOG.error("Failed to get worker id, shutting down master to avoid data corruption...")
- context.parent ! PoisonPill
- case msg =>
- LOG.info(s"Get message ${msg.getClass.getSimpleName}")
- stash()
- }
-
- def receiveHandler: Receive = workerMsgHandler orElse
- appMasterMsgHandler orElse
- onMasterListChange orElse
- clientMsgHandler orElse
- metricsService orElse
- jarStoreService orElse
- terminationWatch orElse
- disassociated orElse
- kvServiceMsgHandler orElse
- ActorUtil.defaultMsgHandler(self)
-
- def workerMsgHandler: Receive = {
- case RegisterNewWorker =>
- val workerId = WorkerId(nextWorkerId, System.currentTimeMillis())
- nextWorkerId += 1
- kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId)
- val workerHostname = ActorUtil.getHostname(sender())
- LOG.info(s"Register new from $workerHostname ....")
- self forward RegisterWorker(workerId)
-
- case RegisterWorker(id) =>
- context.watch(sender())
- sender ! WorkerRegistered(id, MasterInfo(self, birth))
- scheduler forward WorkerRegistered(id, MasterInfo(self, birth))
- workers += (sender() -> id)
- val workerHostname = ActorUtil.getHostname(sender())
- LOG.info(s"Register Worker with id $id from $workerHostname ....")
- case resourceUpdate: ResourceUpdate =>
- scheduler forward resourceUpdate
- }
-
- def jarStoreService: Receive = {
- case GetJarStoreServer =>
- jarStore forward GetJarStoreServer
- }
-
- def kvServiceMsgHandler: Receive = {
- case PutKVSuccess =>
- // Skip
- case PutKVFailed(key, exception) =>
- LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" +
- ExceptionUtils.getStackTrace(exception))
- }
-
- def metricsService: Receive = {
- case query: QueryHistoryMetrics =>
- if (historyMetricsService.isEmpty) {
- // Returns empty metrics so that we don't hang the UI
- sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
- } else {
- historyMetricsService.get forward query
- }
- }
-
- def appMasterMsgHandler: Receive = {
- case request: RequestResource =>
- scheduler forward request
- case registerAppMaster: RegisterAppMaster =>
- appManager forward registerAppMaster
- case activateAppMaster: ActivateAppMaster =>
- appManager forward activateAppMaster
- case save: SaveAppData =>
- appManager forward save
- case get: GetAppData =>
- appManager forward get
- case GetAllWorkers =>
- sender ! WorkerList(workers.values.toList)
- case GetMasterData =>
- val aliveFor = System.currentTimeMillis() - birth
- val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
- val userDir = System.getProperty("user.dir")
-
- val masterDescription =
- MasterSummary(
- MasterNode(hostPort.host, hostPort.port),
- masters,
- aliveFor,
- logFileDir,
- jarStoreRootPath,
- MasterStatus.Synced,
- userDir,
- List.empty[MasterActivity],
- jvmName = ManagementFactory.getRuntimeMXBean().getName(),
- historyMetricsConfig = getHistoryMetricsConfig
- )
-
- sender ! MasterData(masterDescription)
-
- case invalidAppMaster: InvalidAppMaster =>
- appManager forward invalidAppMaster
- }
-
- import scala.util.{Failure, Success}
-
- def onMasterListChange: Receive = {
- case MasterListUpdated(masters: List[MasterNode]) =>
- this.masters = masters
- }
-
- def clientMsgHandler: Receive = {
- case app: SubmitApplication =>
- LOG.debug(s"Receive from client, SubmitApplication $app")
- appManager.forward(app)
- case app: RestartApplication =>
- LOG.debug(s"Receive from client, RestartApplication $app")
- appManager.forward(app)
- case app: ShutdownApplication =>
- LOG.debug(s"Receive from client, Shutting down Application ${app.appId}")
- scheduler ! ApplicationFinished(app.appId)
- appManager.forward(app)
- case app: ResolveAppId =>
- LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef")
- appManager.forward(app)
- case resolve: ResolveWorkerId =>
- LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}")
- val worker = workers.find(_._2 == resolve.workerId)
- worker match {
- case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1))
- case None => sender ! ResolveWorkerIdResult(Failure(
- new Exception(s"cannot find worker ${resolve.workerId}")))
- }
- case AppMastersDataRequest =>
- LOG.debug("Master received AppMastersDataRequest")
- appManager forward AppMastersDataRequest
- case appMasterDataRequest: AppMasterDataRequest =>
- LOG.debug("Master received AppMasterDataRequest")
- appManager forward appMasterDataRequest
- case query: QueryAppMasterConfig =>
- LOG.debug("Master received QueryAppMasterConfig")
- appManager forward query
- case QueryMasterConfig =>
- sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
- }
-
- def disassociated: Receive = {
- case disassociated: DisassociatedEvent =>
- LOG.info(s" disassociated ${disassociated.remoteAddress}")
- }
-
- def terminationWatch: Receive = {
- case t: Terminated =>
- val actor = t.actor
- LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" +
- t.getAddressTerminated())
-
- LOG.info("Let's filter out dead resources...")
- // Filters out dead worker resource
- if (workers.keySet.contains(actor)) {
- scheduler ! WorkerTerminated(workers.get(actor).get)
- workers -= actor
- }
- }
-
- override def preStart(): Unit = {
- val path = ActorUtil.getFullPath(context.system, self.path)
- LOG.info(s"master path is $path")
- val schedulerClass = Class.forName(
- systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER))
-
- appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)),
- classOf[AppManager].getSimpleName)
- scheduler = context.actorOf(Props(schedulerClass))
- context.system.eventStream.subscribe(self, classOf[DisassociatedEvent])
- }
-}
-
-object Master {
- final val MASTER_GROUP = "master_group"
-
- final val WORKER_ID = "next_worker_id"
-
- case class WorkerTerminated(workerId: WorkerId)
-
- case class MasterInfo(master: ActorRef, startTime: Long = 0L)
-
- /** Notify the subscriber that master actor list has been updated */
- case class MasterListUpdated(masters: List[MasterNode])
-
- object MasterInfo {
- def empty: MasterInfo = MasterInfo(null)
- }
-
- case class SlotStatus(totalSlots: Int, availableSlots: Int)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
deleted file mode 100644
index 1429694..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.scheduler
-
-import org.apache.gearpump.cluster.worker.WorkerId
-
-import scala.collection.mutable
-
-import akka.actor.ActorRef
-
-import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
-import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
-import org.apache.gearpump.cluster.scheduler.Relaxation._
-import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest
-
-/** Assign resource to application based on the priority of the application */
-class PriorityScheduler extends Scheduler {
- private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
-
- def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] {
- override def compare(x: PendingRequest, y: PendingRequest): Int = {
- var res = x.request.priority.id - y.request.priority.id
- if (res == 0) {
- res = y.timeStamp.compareTo(x.timeStamp)
- }
- res
- }
- }
-
- override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler
-
- override def allocateResource(): Unit = {
- var scheduleLater = Array.empty[PendingRequest]
- val resourcesSnapShot = resources.clone()
- var allocated = Resource.empty
- val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
-
- while (resourceRequests.nonEmpty && (allocated < totalResource)) {
- val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue()
- request.relaxation match {
- case ANY =>
- val allocations = allocateFairly(resourcesSnapShot, request)
- val newAllocated = Resource(allocations.map(_.resource.slots).sum)
- if (allocations.nonEmpty) {
- appMaster ! ResourceAllocated(allocations.toArray)
- }
- if (newAllocated < request.resource) {
- val remainingRequest = request.resource - newAllocated
- val remainingExecutors = request.executorNum - allocations.length
- val newResourceRequest = request.copy(resource = remainingRequest,
- executorNum = remainingExecutors)
- scheduleLater = scheduleLater :+
- PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
- }
- allocated = allocated + newAllocated
- case ONEWORKER =>
- val availableResource = resourcesSnapShot.find { params =>
- val (_, (_, resource)) = params
- resource > request.resource
- }
- if (availableResource.nonEmpty) {
- val (workerId, (worker, resource)) = availableResource.get
- allocated = allocated + request.resource
- appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
- workerId)))
- resourcesSnapShot.update(workerId, (worker, resource - request.resource))
- } else {
- scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
- }
- case SPECIFICWORKER =>
- val workerAndResource = resourcesSnapShot.get(request.workerId)
- if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
- val (worker, availableResource) = workerAndResource.get
- appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker,
- request.workerId)))
- allocated = allocated + request.resource
- resourcesSnapShot.update(request.workerId, (worker,
- availableResource - request.resource))
- } else {
- scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
- }
- }
- }
- for (request <- scheduleLater)
- resourceRequests.enqueue(request)
- }
-
- def resourceRequestHandler: Receive = {
- case RequestResource(appId, request) =>
- LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " +
- s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}")
- val appMaster = sender()
- resourceRequests.enqueue(new PendingRequest(appId, appMaster, request,
- System.currentTimeMillis()))
- allocateResource()
- }
-
- override def doneApplication(appId: Int): Unit = {
- resourceRequests = resourceRequests.filter(_.appId != appId)
- }
-
- private def allocateFairly(
- resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest)
- : List[ResourceAllocation] = {
- val workerNum = resources.size
- var allocations = List.empty[ResourceAllocation]
- var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
- var remainingRequest = request.resource
- var remainingExecutors = Math.min(request.executorNum, request.resource.slots)
-
- while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
- val exeutorNum = Math.min(workerNum, remainingExecutors)
- val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
-
- val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse)
- val pickedResources = sortedResources.take(exeutorNum)
-
- val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex =>
- val ((workerId, (worker, resource)), index) = workerWithIndex
- 0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
- }.sortBy(_._2).map(_._1)
-
- if (flattenResource.length < toRequest.slots) {
- // Can not safisfy the user's requirements
- totalAvailable = Resource.empty
- } else {
- flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).
- toArray.foreach { params =>
- val ((workerId, worker), slots) = params
- resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
- allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
- }
- totalAvailable -= toRequest
- remainingRequest -= toRequest
- remainingExecutors -= exeutorNum
- }
- }
- allocations
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
deleted file mode 100644
index 7187c1a..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.scheduler
-
-import org.apache.gearpump.cluster.worker.WorkerId
-
-import scala.collection.mutable
-
-import akka.actor.{Actor, ActorRef}
-import org.slf4j.Logger
-
-import org.apache.gearpump.TimeStamp
-import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
-import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
-import org.apache.gearpump.cluster.master.Master.WorkerTerminated
-import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Scheduler schedule resource for different applications.
- */
-abstract class Scheduler extends Actor {
- val LOG: Logger = LogUtil.getLogger(getClass)
- protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)]
-
- def handleScheduleMessage: Receive = {
- case WorkerRegistered(id, _) =>
- if (!resources.contains(id)) {
- LOG.info(s"Worker $id added to the scheduler")
- resources.put(id, (sender, Resource.empty))
- }
- case update@ResourceUpdate(worker, workerId, resource) =>
- LOG.info(s"$update...")
- if (resources.contains(workerId)) {
- val resourceReturned = resource > resources.get(workerId).get._2
- resources.update(workerId, (worker, resource))
- if (resourceReturned) {
- allocateResource()
- }
- sender ! UpdateResourceSucceed
- }
- else {
- sender ! UpdateResourceFailed(
- s"ResourceUpdate failed! The worker $workerId has not been registered into master")
- }
- case WorkerTerminated(workerId) =>
- if (resources.contains(workerId)) {
- resources -= workerId
- }
- case ApplicationFinished(appId) =>
- doneApplication(appId)
- }
-
- def allocateResource(): Unit
-
- def doneApplication(appId: Int): Unit
-}
-
-object Scheduler {
- case class PendingRequest(
- appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
-
- case class ApplicationFinished(appId: Int)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
deleted file mode 100644
index b4e6f9e..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.worker
-
-import java.io.File
-
-import com.typesafe.config.Config
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.util.{LogUtil, RichProcess, Util}
-
-/** Launcher to start an executor process */
-class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher {
- private val LOG: Logger = LogUtil.getLogger(getClass)
-
- override def createProcess(
- appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String],
- classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = {
-
- LOG.info(s"Launch executor $executorId, classpath: ${classPath.mkString(File.pathSeparator)}")
- Util.startProcess(options, classPath, mainClass, arguments)
- }
-
- override def cleanProcess(appId: Int, executorId: Int): Unit = {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
deleted file mode 100644
index 1b52e5d..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
+++ /dev/null
@@ -1,581 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.worker
-
-import java.io.File
-import java.lang.management.ManagementFactory
-import java.net.URL
-import java.util.concurrent.{Executors, TimeUnit}
-import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher
-
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.SupervisorStrategy.Stop
-import akka.actor._
-import com.typesafe.config.{ConfigValueFactory, Config, ConfigFactory}
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import org.apache.gearpump.cluster.AppMasterToWorker._
-import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
-import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
-import org.apache.gearpump.cluster.MasterToWorker._
-import org.apache.gearpump.cluster.WorkerToAppMaster._
-import org.apache.gearpump.cluster.WorkerToMaster._
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
-import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
-import org.apache.gearpump.metrics.Metrics.ReportMetrics
-import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import org.apache.gearpump.util.ActorSystemBooter.Daemon
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import org.apache.gearpump.util.{TimeOutScheduler, _}
-
-/**
- * Worker is used to track the resource on single machine, it is like
- * the node manager of YARN.
- *
- * @param masterProxy masterProxy is used to resolve the master
- */
-private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
- private val systemConfig: Config = context.system.settings.config
-
- private val address = ActorUtil.getFullPath(context.system, self.path)
- private var resource = Resource.empty
- private var allocatedResources = Map[ActorRef, Resource]()
- private var executorsInfo = Map[ActorRef, ExecutorSlots]()
- private var id: WorkerId = WorkerId.unspecified
- private val createdTime = System.currentTimeMillis()
- private var masterInfo: MasterInfo = null
- private var executorNameToActor = Map.empty[String, ActorRef]
- private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
- private val jarStoreClient = new JarStoreClient(systemConfig, context.system)
-
- private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
- private val resourceUpdateTimeoutMs = 30000 // Milliseconds
-
- private var totalSlots: Int = 0
-
- val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
- var historyMetricsService: Option[ActorRef] = None
-
- override def receive: Receive = null
- var LOG: Logger = LogUtil.getLogger(getClass)
-
- def service: Receive =
- appMasterMsgHandler orElse
- clientMessageHandler orElse
- metricsService orElse
- terminationWatch(masterInfo.master) orElse
- ActorUtil.defaultMsgHandler(self)
-
- def metricsService: Receive = {
- case query: QueryHistoryMetrics =>
- if (historyMetricsService.isEmpty) {
- // Returns empty metrics so that we don't hang the UI
- sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
- } else {
- historyMetricsService.get forward query
- }
- }
-
- private var metricsInitialized = false
-
- val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
-
- private def initializeMetrics(): Unit = {
- // Registers jvm metrics
- val metricsSetName = "worker" + WorkerId.render(id)
- Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
-
- historyMetricsService = if (metricsEnabled) {
- val historyMetricsService = {
- context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
- }
-
- val metricsReportService = context.actorOf(Props(
- new MetricsReporterService(Metrics(context.system))))
- historyMetricsService.tell(ReportMetrics, metricsReportService)
- Some(historyMetricsService)
- } else {
- None
- }
- }
-
- def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
-
- // If master get disconnected, the WorkerRegistered may be triggered multiple times.
- case WorkerRegistered(id, masterInfo) =>
- this.id = id
-
- // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
- // itself.
- if (!metricsInitialized) {
- initializeMetrics()
- metricsInitialized = true
- }
-
- this.masterInfo = masterInfo
- timeoutTicker.cancel()
- context.watch(masterInfo.master)
- this.LOG = LogUtil.getLogger(getClass, worker = id)
- LOG.info(s"Worker is registered. " +
- s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
- sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
- resourceUpdateTimeoutMs, updateResourceTimeOut())
- context.become(service)
- }
-
- private def updateResourceTimeOut(): Unit = {
- LOG.error(s"Update worker resource time out")
- }
-
- def appMasterMsgHandler: Receive = {
- case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
- val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
- val executorToStop = executorNameToActor.get(actorName)
- if (executorToStop.isDefined) {
- LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
- s"due to: $reason")
- executorToStop.get.forward(shutdown)
- } else {
- LOG.error(s"Cannot find executor $actorName, ignore this message")
- sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
- }
- case launch: LaunchExecutor =>
- LOG.info(s"$launch")
- if (resource < launch.resource) {
- sender ! ExecutorLaunchRejected("There is no free resource on this machine")
- } else {
- val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
-
- val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
- jarStoreClient, executorProcLauncher))
- executorNameToActor += actorName -> executor
-
- resource = resource - launch.resource
- allocatedResources = allocatedResources + (executor -> launch.resource)
-
- reportResourceToMaster()
- executorsInfo += executor ->
- ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
- context.watch(executor)
- }
- case UpdateResourceFailed(reason, ex) =>
- LOG.error(reason)
- context.stop(self)
- case UpdateResourceSucceed =>
- LOG.info(s"Update resource succeed")
- case GetWorkerData(workerId) =>
- val aliveFor = System.currentTimeMillis() - createdTime
- val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
- val userDir = System.getProperty("user.dir")
- sender ! WorkerData(WorkerSummary(
- id, "active",
- address,
- aliveFor,
- logDir,
- executorsInfo.values.toArray,
- totalSlots,
- resource.slots,
- userDir,
- jvmName = ManagementFactory.getRuntimeMXBean().getName(),
- resourceManagerContainerId = systemConfig.getString(
- GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
- historyMetricsConfig = getHistoryMetricsConfig)
- )
- case ChangeExecutorResource(appId, executorId, usedResource) =>
- for (executor <- executorActorRef(appId, executorId);
- allocatedResource <- allocatedResources.get(executor)) {
-
- allocatedResources += executor -> usedResource
- resource = resource + allocatedResource - usedResource
- reportResourceToMaster()
-
- if (usedResource == Resource(0)) {
- executorsInfo -= executor
- allocatedResources -= executor
- // stop executor if there is no resource binded to it.
- LOG.info(s"Shutdown executor $executorId because the resource used is zero")
- executor ! ShutdownExecutor(appId, executorId,
- "Shutdown executor because the resource used is zero")
- }
- }
- }
-
- private def reportResourceToMaster(): Unit = {
- sendMsgWithTimeOutCallBack(masterInfo.master,
- ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
- }
-
- private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
- val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
- executorNameToActor.get(actorName)
- }
-
- def clientMessageHandler: Receive = {
- case QueryWorkerConfig(workerId) =>
- if (this.id == workerId) {
- sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
- } else {
- sender ! WorkerConfig(ConfigFactory.empty)
- }
- }
-
- private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
- repeatActionUtil(
- seconds = timeOutSeconds,
- action = () => {
- masterProxy ! RegisterWorker(workerId)
- },
- onTimeout = () => {
- LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
- s"seconds, abort and kill the worker...")
- self ! PoisonPill
- })
- }
-
- def terminationWatch(master: ActorRef): Receive = {
- case Terminated(actor) =>
- if (actor.compareTo(master) == 0) {
- // Parent master is down, no point to keep worker anymore. Let's make suicide to free
- // resources
- LOG.info(s"Master cannot be contacted, find a new master ...")
- context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
- } else if (ActorUtil.isChildActorPath(self, actor)) {
- // One executor is down,
- LOG.info(s"Executor is down ${getExecutorName(actor)}")
-
- val allocated = allocatedResources.get(actor)
- if (allocated.isDefined) {
- resource = resource + allocated.get
- executorsInfo -= actor
- allocatedResources = allocatedResources - actor
- sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
- resourceUpdateTimeoutMs, updateResourceTimeOut())
- }
- }
- }
-
- private def getExecutorName(actorRef: ActorRef): Option[String] = {
- executorNameToActor.find(_._2 == actorRef).map(_._1)
- }
-
- private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
- val launcherClazz = Class.forName(
- systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
- launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
- .asInstanceOf[ExecutorProcessLauncher]
- }
-
- import context.dispatcher
- override def preStart(): Unit = {
- LOG.info(s"RegisterNewWorker")
- totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
- this.resource = Resource(totalSlots)
- masterProxy ! RegisterNewWorker
- context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
- }
-
- private def registerTimeoutTicker(seconds: Int): Cancellable = {
- repeatActionUtil(seconds, () => Unit, () => {
- LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
- s"abort and kill the worker...")
- self ! PoisonPill
- })
- }
-
- private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
- : Cancellable = {
- val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
- Duration(2, TimeUnit.SECONDS))(action())
- val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
- new Cancellable {
- def cancel(): Boolean = {
- val result1 = cancelTimeout.cancel()
- val result2 = cancelSuicide.cancel()
- result1 && result2
- }
-
- def isCancelled: Boolean = {
- cancelTimeout.isCancelled && cancelSuicide.isCancelled
- }
- }
- }
-
- override def postStop(): Unit = {
- LOG.info(s"Worker is going down....")
- ioPool.shutdown()
- context.system.terminate()
- }
-}
-
-private[cluster] object Worker {
-
- case class ExecutorResult(result: Try[Int])
-
- class ExecutorWatcher(
- launch: LaunchExecutor,
- masterInfo: MasterInfo,
- ioPool: ExecutionContext,
- jarStoreClient: JarStoreClient,
- procLauncher: ExecutorProcessLauncher) extends Actor {
- import launch.{appId, executorId, resource}
-
- private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
-
- val executorConfig: Config = {
- val workerConfig = context.system.settings.config
-
- val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
- Option(jvmConfig.executorAkkaConfig)
- }.getOrElse(ConfigFactory.empty())
-
- resolveExecutorConfig(workerConfig, submissionConfig)
- }
-
- // For some config, worker has priority, for others, user Application submission config
- // have priorities.
- private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
- val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
- .withoutPath(GEARPUMP_CLUSTER_MASTERS)
- .withoutPath(GEARPUMP_HOME)
- .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
- .withoutPath(GEARPUMP_LOG_APPLICATION_DIR)
- .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
- // Falls back to workerConfig
- .withFallback(workerConfig)
-
- // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
- val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
- val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
- LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
- config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
- } else {
- config
- }
-
- // Excludes reference.conf, and JVM properties..
- ClusterConfig.filterOutDefaultConfig(updatedConf)
- }
-
- implicit val executorService = ioPool
-
- private val executorHandler = {
- val ctx = launch.executorJvmConfig
-
- if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
- new ExecutorHandler {
- val exitPromise = Promise[Int]()
- val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
-
- override def destroy(): Unit = {
- context.stop(app)
- }
- override def exitValue: Future[Int] = {
- exitPromise.future
- }
- }
- } else {
- createProcess(ctx)
- }
- }
-
- private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
-
- val process = Future {
- val jarPath = ctx.jar.map { appJar =>
- val tempFile = File.createTempFile(appJar.name, ".jar")
- jarStoreClient.copyToLocalFile(tempFile, appJar.filePath)
- val file = new URL("file:" + tempFile)
- file.getFile
- }
-
- val configFile = {
- val configFile = File.createTempFile("gearpump", ".conf")
- ClusterConfig.saveConfig(executorConfig, configFile)
- val file = new URL("file:" + configFile)
- file.getFile
- }
-
- val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
- ctx.classPath.map(path => expandEnviroment(path)) ++
- jarPath.map(Array(_)).getOrElse(Array.empty[String])
-
- val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
- val logArgs = List(
- s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
- s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
- s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
- s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
- val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
-
- val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
-
- // Remote debug executor process
- val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
- val remoteDebugConfig = if (remoteDebugFlag) {
- val availablePort = Util.findFreePort().get
- List(
- "-Xdebug",
- s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
- s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
- )
- } else {
- List.empty[String]
- }
-
- val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
- val verboseGCConfig = if (verboseGCFlag) {
- List(
- s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
- "-verbose:gc",
- "-XX:+PrintGCDetails",
- "-XX:+PrintGCDateStamps",
- "-XX:+PrintTenuringDistribution",
- "-XX:+PrintGCApplicationConcurrentTime",
- "-XX:+PrintGCApplicationStoppedTime"
- )
- } else {
- List.empty[String]
- }
-
- val ipv4 = List(s"-D${PREFER_IPV4}=true")
-
- val options = ctx.jvmArguments ++ username ++
- logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
-
- val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
- options, classPath, ctx.mainClass, ctx.arguments)
-
- ProcessInfo(process, jarPath, configFile)
- }
-
- new ExecutorHandler {
-
- var destroyed = false
-
- override def destroy(): Unit = {
- LOG.info(s"Destroy executor process ${ctx.mainClass}")
- if (!destroyed) {
- destroyed = true
- process.foreach { info =>
- info.process.destroy()
- info.jarPath.foreach(new File(_).delete())
- new File(info.configFile).delete()
- }
- }
- }
-
- override def exitValue: Future[Int] = {
- process.flatMap { info =>
- val exit = info.process.exitValue()
- if (exit == 0) {
- Future.successful(0)
- } else {
- Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
- s"error summary: ${info.process.logger.error}"))
- }
- }
- }
- }
- }
-
- private def expandEnviroment(path: String): String = {
- // TODO: extend this to support more environment.
- path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
- }
-
- override def preStart(): Unit = {
- executorHandler.exitValue.onComplete { value =>
- procLauncher.cleanProcess(appId, executorId)
- val result = ExecutorResult(value)
- self ! result
- }
- }
-
- override def postStop(): Unit = {
- executorHandler.destroy()
- }
-
- // The folders are under ${GEARPUMP_HOME}
- val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" +
- File.separator + "yarn")
-
- override def receive: Receive = {
- case ShutdownExecutor(appId, executorId, reason: String) =>
- executorHandler.destroy()
- sender ! ShutdownExecutorSucceed(appId, executorId)
- context.stop(self)
- case ExecutorResult(executorResult) =>
- executorResult match {
- case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
- case Failure(e) => LOG.error("Executor exit with errors", e)
- }
- context.stop(self)
- }
-
- private def getFormatedTime(timestamp: Long): String = {
- val datePattern = "yyyy-MM-dd-HH-mm"
- val format = new java.text.SimpleDateFormat(datePattern)
- format.format(timestamp)
- }
-
- private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
- classPath.filterNot(matchDaemonPattern(_))
- }
-
- private def matchDaemonPattern(path: String): Boolean = {
- daemonPathPattern.exists(path.contains(_))
- }
- }
-
- trait ExecutorHandler {
- def destroy(): Unit
- def exitValue: Future[Int]
- }
-
- case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
-
- /**
- * Starts the executor in the same JVM as worker.
- */
- class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
- extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
- private val exitCode = 0
-
- override val supervisorStrategy =
- OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
- case ex: Throwable =>
- LOG.error(s"system $name stopped ", ex)
- exit.failure(ex)
- Stop
- }
-
- override def postStop(): Unit = {
- if (!exit.isCompleted) {
- exit.success(exitCode)
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
deleted file mode 100644
index a6b75cb..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.pattern.ask
-import akka.testkit.TestActorRef
-import com.typesafe.config.ConfigValueFactory
-
-import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
-import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
-import org.apache.gearpump.cluster.master.Master
-import org.apache.gearpump.cluster.worker.Worker
-import org.apache.gearpump.util.Constants
-
-class MiniCluster {
- private val mockMasterIP = "127.0.0.1"
-
- implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
- withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP)))
-
- val (mockMaster, worker) = {
- val master = system.actorOf(Props(classOf[Master]), "master")
- val worker = system.actorOf(Props(classOf[Worker], master), "worker")
-
- // Wait until worker register itself to master
- waitUtilWorkerIsRegistered(master)
- (master, worker)
- }
-
- def launchActor(props: Props): TestActorRef[Actor] = {
- TestActorRef(props)
- }
-
- private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
- while (!isWorkerRegistered(master)) {}
- }
-
- private def isWorkerRegistered(master: ActorRef): Boolean = {
- import scala.concurrent.duration._
- implicit val dispatcher = system.dispatcher
-
- implicit val futureTimeout = Constants.FUTURE_TIMEOUT
-
- val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]]
-
- // Waits until the worker is registered.
- val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
- workers.workers.size > 0
- }
-
- def shutDown(): Unit = {
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
deleted file mode 100644
index 90fdd39..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import java.util.Properties
-
-import akka.testkit.TestProbe
-import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
-import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.transport.HostPort
-
-import scala.concurrent.Future
-import scala.util.{Success, Try}
-
-import com.typesafe.config.{ConfigFactory, Config}
-import org.scalatest._
-
-import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _}
-import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult}
-import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.{Constants, LogUtil, Util}
-
-class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
-
- private val LOG = LogUtil.getLogger(getClass)
-
- override def config: Config = TestUtil.DEFAULT_CONFIG
-
- override def beforeEach(): Unit = {
- startActorSystem()
- }
-
- override def afterEach(): Unit = {
- shutdownActorSystem()
- }
-
- "Worker" should "register worker address to master when started." in {
-
- val masterReceiver = createMockMaster()
-
- val tempTestConf = convertTestConf(getHost, getPort)
-
- val options = Array(
- s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
- s"-D${PREFER_IPV4}=true"
- ) ++ getMasterListOption()
-
- val worker = Util.startProcess(options,
- getContextClassPath,
- getMainClassName(Worker),
- Array.empty)
-
- try {
- masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
-
- tempTestConf.delete()
- } finally {
- worker.destroy()
- }
- }
-
- "Master" should "accept worker RegisterNewWorker when started" in {
- val worker = TestProbe()(getActorSystem)
-
- val host = "127.0.0.1"
- val port = Util.findFreePort().get
-
- val properties = new Properties()
- properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port")
- properties.put(s"${GEARPUMP_HOSTNAME}", s"$host")
- val masterConfig = ConfigFactory.parseProperties(properties)
- .withFallback(TestUtil.MASTER_CONFIG)
- Future {
- Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString))
- }
-
- val masterProxy = getActorSystem.actorOf(
- MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
-
- worker.send(masterProxy, RegisterNewWorker)
- worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
- }
-
- "Info" should "be started without exception" in {
-
- val masterReceiver = createMockMaster()
-
- Future {
- org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
- }
-
- masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
- masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName"))))
- }
-
- "Kill" should "be started without exception" in {
-
- val masterReceiver = createMockMaster()
-
- Future {
- Kill.main(masterConfig, Array("-appid", "0"))
- }
-
- masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
- masterReceiver.reply(ShutdownApplicationResult(Success(0)))
- }
-
- "Replay" should "be started without exception" in {
-
- val masterReceiver = createMockMaster()
-
- Future {
- Replay.main(masterConfig, Array("-appid", "0"))
- }
-
- masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
- masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
- masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
- masterReceiver.reply(ReplayApplicationResult(Success(0)))
- }
-
- "Local" should "be started without exception" in {
- val port = Util.findFreePort().get
- val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
- s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
- s"-D${PREFER_IPV4}=true")
-
- val local = Util.startProcess(options,
- getContextClassPath,
- getMainClassName(Local),
- Array.empty)
-
- def retry(times: Int)(fn: => Boolean): Boolean = {
-
- LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..")
-
- val result = fn
- if (result || times <= 0) {
- result
- } else {
- Thread.sleep(1000)
- retry(times - 1)(fn)
- }
- }
-
- try {
- assert(retry(10)(isPortUsed("127.0.0.1", port)),
- "local is not started successfully, as port is not used " + port)
- } finally {
- local.destroy()
- }
- }
-
- "Gear" should "support app|info|kill|shell|replay" in {
-
- val commands = Array("app", "info", "kill", "shell", "replay")
-
- assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
-
- for (command <- commands) {
- assert(Try(Gear.main(Array("-noexist"))).isFailure,
- "pass unknown option, throw, command: " + command)
- }
-
- assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ")
-
- val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
- assert(tryThis.isFailure, "unknown command, throw")
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
deleted file mode 100644
index e1ba8f6..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster.main
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.gearpump.cluster.TestUtil
-
-class MasterWatcherSpec extends FlatSpec with Matchers {
- def config: Config = TestUtil.MASTER_CONFIG
-
- "MasterWatcher" should "kill itself when can not get a quorum" in {
- val system = ActorSystem("ForMasterWatcher", config)
-
- val actorWatcher = TestProbe()(system)
-
- val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
- actorWatcher watch masterWatcher
- actorWatcher.expectTerminated(masterWatcher, 5.seconds)
- system.terminate()
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
deleted file mode 100644
index 58e3593..0000000
--- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.master
-
-import scala.util.Success
-
-import akka.actor.{Actor, ActorRef, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.Config
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
-import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
-import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
-import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState}
-import org.apache.gearpump.cluster.master.AppManager._
-import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess}
-import org.apache.gearpump.cluster.{TestUtil, _}
-import org.apache.gearpump.util.LogUtil
-
-class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
- var kvService: TestProbe = null
- var haService: TestProbe = null
- var appLauncher: TestProbe = null
- var appManager: ActorRef = null
- private val LOG = LogUtil.getLogger(getClass)
-
- override def config: Config = TestUtil.DEFAULT_CONFIG
-
- override def beforeEach(): Unit = {
- startActorSystem()
- kvService = TestProbe()(getActorSystem)
- appLauncher = TestProbe()(getActorSystem)
-
- appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
- new DummyAppMasterLauncherFactory(appLauncher))))
- kvService.expectMsgType[GetKV]
- kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty)))
- }
-
- override def afterEach(): Unit = {
- shutdownActorSystem()
- }
-
- "AppManager" should "handle AppMaster message correctly" in {
- val appMaster = TestProbe()(getActorSystem)
- val appId = 1
-
- val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName"))
- appMaster.send(appManager, register)
- appMaster.expectMsgType[AppMasterRegistered]
-
- appMaster.send(appManager, ActivateAppMaster(appId))
- appMaster.expectMsgType[AppMasterActivated]
- }
-
- "DataStoreService" should "support Put and Get" in {
- val appMaster = TestProbe()(getActorSystem)
- appMaster.send(appManager, SaveAppData(0, "key", 1))
- kvService.expectMsgType[PutKV]
- kvService.reply(PutKVSuccess)
- appMaster.expectMsg(AppDataSaved)
-
- appMaster.send(appManager, GetAppData(0, "key"))
- kvService.expectMsgType[GetKV]
- kvService.reply(GetKVSuccess("key", 1))
- appMaster.expectMsg(GetAppDataResult("key", 1))
- }
-
- "AppManager" should "support application submission and shutdown" in {
- testClientSubmission(withRecover = false)
- }
-
- "AppManager" should "support application submission and recover if appmaster dies" in {
- LOG.info("=================testing recover==============")
- testClientSubmission(withRecover = true)
- }
-
- "AppManager" should "handle client message correctly" in {
- val mockClient = TestProbe()(getActorSystem)
- mockClient.send(appManager, ShutdownApplication(1))
- assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
-
- mockClient.send(appManager, ResolveAppId(1))
- assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
-
- mockClient.send(appManager, AppMasterDataRequest(1))
- mockClient.expectMsg(AppMasterData(AppMasterNonExist))
- }
-
- "AppManager" should "reject the application submission if the app name already existed" in {
- val app = TestUtil.dummyApp
- val submit = SubmitApplication(app, None, "username")
- val client = TestProbe()(getActorSystem)
- val appMaster = TestProbe()(getActorSystem)
- val worker = TestProbe()(getActorSystem)
- val appId = 1
-
- client.send(appManager, submit)
-
- kvService.expectMsgType[PutKV]
- appLauncher.expectMsg(LauncherStarted(appId))
- appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
- AppMasterRuntimeInfo(appId, app.name)))
- appMaster.expectMsgType[AppMasterRegistered]
-
- client.send(appManager, submit)
- assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
- }
-
- def testClientSubmission(withRecover: Boolean): Unit = {
- val app = TestUtil.dummyApp
- val submit = SubmitApplication(app, None, "username")
- val client = TestProbe()(getActorSystem)
- val appMaster = TestProbe()(getActorSystem)
- val worker = TestProbe()(getActorSystem)
- val appId = 1
-
- client.send(appManager, submit)
-
- kvService.expectMsgType[PutKV]
- appLauncher.expectMsg(LauncherStarted(appId))
- appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
- AppMasterRuntimeInfo(appId, app.name)))
- kvService.expectMsgType[PutKV]
- appMaster.expectMsgType[AppMasterRegistered]
-
- client.send(appManager, ResolveAppId(appId))
- client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
-
- client.send(appManager, AppMastersDataRequest)
- client.expectMsgType[AppMastersData]
-
- client.send(appManager, AppMasterDataRequest(appId, false))
- client.expectMsgType[AppMasterData]
-
- if (!withRecover) {
- client.send(appManager, ShutdownApplication(appId))
- client.expectMsg(ShutdownApplicationResult(Success(appId)))
- } else {
- // Do recovery
- getActorSystem.stop(appMaster.ref)
- kvService.expectMsgType[GetKV]
- val appState = ApplicationState(appId, "application1", 1, app, None, "username", null)
- kvService.reply(GetKVSuccess(APP_STATE, appState))
- appLauncher.expectMsg(LauncherStarted(appId))
- }
- }
-}
-
-class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory {
-
- override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
- username: String, master: ActorRef, client: Option[ActorRef]): Props = {
- Props(new DummyAppMasterLauncher(test, appId))
- }
-}
-
-class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
-
- test.ref ! LauncherStarted(appId)
- override def receive: Receive = {
- case any: Any => test.ref forward any
- }
-}
-
-case class LauncherStarted(appId: Int)