You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/14 11:55:24 UTC
[3/4] incubator-gearpump git commit: [GEARPUMP-224] merge
gearpump-daemon to gearpump-core
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
new file mode 100644
index 0000000..447b034
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.worker
+
+import java.io.File
+import java.lang.management.ManagementFactory
+import java.net.URL
+import java.util.concurrent.{Executors, TimeUnit}
+
+import akka.actor.SupervisorStrategy.Stop
+import akka.actor._
+import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
+import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceSucceed, UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster._
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher
+import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig}
+import org.apache.gearpump.jarstore.JarStoreClient
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
+import org.apache.gearpump.util.ActorSystemBooter.Daemon
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+import org.apache.gearpump.util.{TimeOutScheduler, _}
+import org.slf4j.Logger
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+/**
+ * Worker is used to track the resource on single machine, it is like
+ * the node manager of YARN.
+ *
+ * @param masterProxy masterProxy is used to resolve the master
+ */
+private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler {
+ private val systemConfig: Config = context.system.settings.config
+
+ private val address = ActorUtil.getFullPath(context.system, self.path)
+ private var resource = Resource.empty
+ private var allocatedResources = Map[ActorRef, Resource]()
+ private var executorsInfo = Map[ActorRef, ExecutorSlots]()
+ private var id: WorkerId = WorkerId.unspecified
+ private val createdTime = System.currentTimeMillis()
+ private var masterInfo: MasterInfo = null
+ private var executorNameToActor = Map.empty[String, ActorRef]
+ private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher()
+ private val jarStoreClient = new JarStoreClient(systemConfig, context.system)
+
+ private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
+ private val resourceUpdateTimeoutMs = 30000 // Milliseconds
+
+ private var totalSlots: Int = 0
+
+ val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+ var historyMetricsService: Option[ActorRef] = None
+
+ override def receive: Receive = null
+ var LOG: Logger = LogUtil.getLogger(getClass)
+
+ def service: Receive =
+ appMasterMsgHandler orElse
+ clientMessageHandler orElse
+ metricsService orElse
+ terminationWatch(masterInfo.master) orElse
+ ActorUtil.defaultMsgHandler(self)
+
+ def metricsService: Receive = {
+ case query: QueryHistoryMetrics =>
+ if (historyMetricsService.isEmpty) {
+ // Returns empty metrics so that we don't hang the UI
+ sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
+ } else {
+ historyMetricsService.get forward query
+ }
+ }
+
+ private var metricsInitialized = false
+
+ val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
+
+ private def initializeMetrics(): Unit = {
+ // Registers jvm metrics
+ val metricsSetName = "worker" + WorkerId.render(id)
+ Metrics(context.system).register(new JvmMetricsSet(metricsSetName))
+
+ historyMetricsService = if (metricsEnabled) {
+ val historyMetricsService = {
+ context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig)))
+ }
+
+ val metricsReportService = context.actorOf(Props(
+ new MetricsReporterService(Metrics(context.system))))
+ historyMetricsService.tell(ReportMetrics, metricsReportService)
+ Some(historyMetricsService)
+ } else {
+ None
+ }
+ }
+
+ def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = {
+
+ // If master get disconnected, the WorkerRegistered may be triggered multiple times.
+ case WorkerRegistered(id, masterInfo) =>
+ this.id = id
+
+ // Adds the flag check, so that we don't re-initialize the metrics when worker re-register
+ // itself.
+ if (!metricsInitialized) {
+ initializeMetrics()
+ metricsInitialized = true
+ }
+
+ this.masterInfo = masterInfo
+ timeoutTicker.cancel()
+ context.watch(masterInfo.master)
+ this.LOG = LogUtil.getLogger(getClass, worker = id)
+ LOG.info(s"Worker is registered. " +
+ s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....")
+ sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource),
+ resourceUpdateTimeoutMs, updateResourceTimeOut())
+ context.become(service)
+ }
+
+ private def updateResourceTimeOut(): Unit = {
+ LOG.error(s"Update worker resource time out")
+ }
+
+ def appMasterMsgHandler: Receive = {
+ case shutdown@ShutdownExecutor(appId, executorId, reason: String) =>
+ val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+ val executorToStop = executorNameToActor.get(actorName)
+ if (executorToStop.isDefined) {
+ LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " +
+ s"due to: $reason")
+ executorToStop.get.forward(shutdown)
+ } else {
+ LOG.error(s"Cannot find executor $actorName, ignore this message")
+ sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId")
+ }
+ case launch: LaunchExecutor =>
+ LOG.info(s"$launch")
+ if (resource < launch.resource) {
+ sender ! ExecutorLaunchRejected("There is no free resource on this machine")
+ } else {
+ val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId)
+
+ val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool,
+ jarStoreClient, executorProcLauncher))
+ executorNameToActor += actorName -> executor
+
+ resource = resource - launch.resource
+ allocatedResources = allocatedResources + (executor -> launch.resource)
+
+ reportResourceToMaster()
+ executorsInfo += executor ->
+ ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots)
+ context.watch(executor)
+ }
+ case UpdateResourceFailed(reason, ex) =>
+ LOG.error(reason)
+ context.stop(self)
+ case UpdateResourceSucceed =>
+ LOG.info(s"Update resource succeed")
+ case GetWorkerData(workerId) =>
+ val aliveFor = System.currentTimeMillis() - createdTime
+ val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath
+ val userDir = System.getProperty("user.dir")
+ sender ! WorkerData(WorkerSummary(
+ id, "active",
+ address,
+ aliveFor,
+ logDir,
+ executorsInfo.values.toArray,
+ totalSlots,
+ resource.slots,
+ userDir,
+ jvmName = ManagementFactory.getRuntimeMXBean().getName(),
+ resourceManagerContainerId = systemConfig.getString(
+ GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID),
+ historyMetricsConfig = getHistoryMetricsConfig)
+ )
+ case ChangeExecutorResource(appId, executorId, usedResource) =>
+ for (executor <- executorActorRef(appId, executorId);
+ allocatedResource <- allocatedResources.get(executor)) {
+
+ allocatedResources += executor -> usedResource
+ resource = resource + allocatedResource - usedResource
+ reportResourceToMaster()
+
+ if (usedResource == Resource(0)) {
+ executorsInfo -= executor
+ allocatedResources -= executor
+ // stop executor if there is no resource binded to it.
+ LOG.info(s"Shutdown executor $executorId because the resource used is zero")
+ executor ! ShutdownExecutor(appId, executorId,
+ "Shutdown executor because the resource used is zero")
+ }
+ }
+ }
+
+ private def reportResourceToMaster(): Unit = {
+ sendMsgWithTimeOutCallBack(masterInfo.master,
+ ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut())
+ }
+
+ private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = {
+ val actorName = ActorUtil.actorNameForExecutor(appId, executorId)
+ executorNameToActor.get(actorName)
+ }
+
+ def clientMessageHandler: Receive = {
+ case QueryWorkerConfig(workerId) =>
+ if (this.id == workerId) {
+ sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+ } else {
+ sender ! WorkerConfig(ConfigFactory.empty)
+ }
+ }
+
+ private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = {
+ repeatActionUtil(
+ seconds = timeOutSeconds,
+ action = () => {
+ masterProxy ! RegisterWorker(workerId)
+ },
+ onTimeout = () => {
+ LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " +
+ s"seconds, abort and kill the worker...")
+ self ! PoisonPill
+ })
+ }
+
+ def terminationWatch(master: ActorRef): Receive = {
+ case Terminated(actor) =>
+ if (actor.compareTo(master) == 0) {
+ // Parent master is down, no point to keep worker anymore. Let's make suicide to free
+ // resources
+ LOG.info(s"Master cannot be contacted, find a new master ...")
+ context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30)))
+ } else if (ActorUtil.isChildActorPath(self, actor)) {
+ // One executor is down,
+ LOG.info(s"Executor is down ${getExecutorName(actor)}")
+
+ val allocated = allocatedResources.get(actor)
+ if (allocated.isDefined) {
+ resource = resource + allocated.get
+ executorsInfo -= actor
+ allocatedResources = allocatedResources - actor
+ sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource),
+ resourceUpdateTimeoutMs, updateResourceTimeOut())
+ }
+ }
+ }
+
+ private def getExecutorName(actorRef: ActorRef): Option[String] = {
+ executorNameToActor.find(_._2 == actorRef).map(_._1)
+ }
+
+ private def getExecutorProcLauncher(): ExecutorProcessLauncher = {
+ val launcherClazz = Class.forName(
+ systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER))
+ launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig)
+ .asInstanceOf[ExecutorProcessLauncher]
+ }
+
+ import context.dispatcher
+ override def preStart(): Unit = {
+ LOG.info(s"RegisterNewWorker")
+ totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS)
+ this.resource = Resource(totalSlots)
+ masterProxy ! RegisterNewWorker
+ context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30)))
+ }
+
+ private def registerTimeoutTicker(seconds: Int): Cancellable = {
+ repeatActionUtil(seconds, () => Unit, () => {
+ LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " +
+ s"abort and kill the worker...")
+ self ! PoisonPill
+ })
+ }
+
+ private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit)
+ : Cancellable = {
+ val cancelTimeout = context.system.scheduler.schedule(Duration.Zero,
+ Duration(2, TimeUnit.SECONDS))(action())
+ val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout())
+ new Cancellable {
+ def cancel(): Boolean = {
+ val result1 = cancelTimeout.cancel()
+ val result2 = cancelSuicide.cancel()
+ result1 && result2
+ }
+
+ def isCancelled: Boolean = {
+ cancelTimeout.isCancelled && cancelSuicide.isCancelled
+ }
+ }
+ }
+
+ override def postStop(): Unit = {
+ LOG.info(s"Worker is going down....")
+ ioPool.shutdown()
+ context.system.terminate()
+ }
+}
+
+private[cluster] object Worker {
+
+ case class ExecutorResult(result: Try[Int])
+
+ class ExecutorWatcher(
+ launch: LaunchExecutor,
+ masterInfo: MasterInfo,
+ ioPool: ExecutionContext,
+ jarStoreClient: JarStoreClient,
+ procLauncher: ExecutorProcessLauncher) extends Actor {
+ import launch.{appId, executorId, resource}
+
+ private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId)
+
+ val executorConfig: Config = {
+ val workerConfig = context.system.settings.config
+
+ val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig =>
+ Option(jvmConfig.executorAkkaConfig)
+ }.getOrElse(ConfigFactory.empty())
+
+ resolveExecutorConfig(workerConfig, submissionConfig)
+ }
+
+ // For some config, worker has priority, for others, user Application submission config
+ // have priorities.
+ private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = {
+ val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME)
+ .withoutPath(GEARPUMP_CLUSTER_MASTERS)
+ .withoutPath(GEARPUMP_HOME)
+ .withoutPath(GEARPUMP_LOG_DAEMON_DIR)
+ .withoutPath(GEARPUMP_LOG_APPLICATION_DIR)
+ .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)
+ // Falls back to workerConfig
+ .withFallback(workerConfig)
+
+ // Minimum supported akka.scheduler.tick-duration on Windows is 10ms
+ val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION)
+ val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) {
+ LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms")
+ config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10))
+ } else {
+ config
+ }
+
+ // Excludes reference.conf, and JVM properties..
+ ClusterConfig.filterOutDefaultConfig(updatedConf)
+ }
+
+ implicit val executorService = ioPool
+
+ private val executorHandler = {
+ val ctx = launch.executorJvmConfig
+
+ if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) {
+ new ExecutorHandler {
+ val exitPromise = Promise[Int]()
+ val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise)))
+
+ override def destroy(): Unit = {
+ context.stop(app)
+ }
+ override def exitValue: Future[Int] = {
+ exitPromise.future
+ }
+ }
+ } else {
+ createProcess(ctx)
+ }
+ }
+
+ private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = {
+
+ val process = Future {
+ val jarPath = ctx.jar.map { appJar =>
+ val tempFile = File.createTempFile(appJar.name, ".jar")
+ jarStoreClient.copyToLocalFile(tempFile, appJar.filePath)
+ val file = new URL("file:" + tempFile)
+ file.getFile
+ }
+
+ val configFile = {
+ val configFile = File.createTempFile("gearpump", ".conf")
+ ClusterConfig.saveConfig(executorConfig, configFile)
+ val file = new URL("file:" + configFile)
+ file.getFile
+ }
+
+ val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++
+ ctx.classPath.map(path => expandEnviroment(path)) ++
+ jarPath.map(Array(_)).getOrElse(Array.empty[String])
+
+ val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR)
+ val logArgs = List(
+ s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}",
+ s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}",
+ s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}",
+ s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}")
+ val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile")
+
+ val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}")
+
+ // Remote debug executor process
+ val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM)
+ val remoteDebugConfig = if (remoteDebugFlag) {
+ val availablePort = Util.findFreePort().get
+ List(
+ "-Xdebug",
+ s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n",
+ s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort"
+ )
+ } else {
+ List.empty[String]
+ }
+
+ val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC)
+ val verboseGCConfig = if (verboseGCFlag) {
+ List(
+ s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log",
+ "-verbose:gc",
+ "-XX:+PrintGCDetails",
+ "-XX:+PrintGCDateStamps",
+ "-XX:+PrintTenuringDistribution",
+ "-XX:+PrintGCApplicationConcurrentTime",
+ "-XX:+PrintGCApplicationStoppedTime"
+ )
+ } else {
+ List.empty[String]
+ }
+
+ val ipv4 = List(s"-D${PREFER_IPV4}=true")
+
+ val options = ctx.jvmArguments ++ username ++
+ logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs
+
+ val process = procLauncher.createProcess(appId, executorId, resource, executorConfig,
+ options, classPath, ctx.mainClass, ctx.arguments)
+
+ ProcessInfo(process, jarPath, configFile)
+ }
+
+ new ExecutorHandler {
+
+ var destroyed = false
+
+ override def destroy(): Unit = {
+ LOG.info(s"Destroy executor process ${ctx.mainClass}")
+ if (!destroyed) {
+ destroyed = true
+ process.foreach { info =>
+ info.process.destroy()
+ info.jarPath.foreach(new File(_).delete())
+ new File(info.configFile).delete()
+ }
+ }
+ }
+
+ override def exitValue: Future[Int] = {
+ process.flatMap { info =>
+ val exit = info.process.exitValue()
+ if (exit == 0) {
+ Future.successful(0)
+ } else {
+ Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " +
+ s"error summary: ${info.process.logger.error}"))
+ }
+ }
+ }
+ }
+ }
+
+ private def expandEnviroment(path: String): String = {
+ // TODO: extend this to support more environment.
+ path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME))
+ }
+
+ override def preStart(): Unit = {
+ executorHandler.exitValue.onComplete { value =>
+ procLauncher.cleanProcess(appId, executorId)
+ val result = ExecutorResult(value)
+ self ! result
+ }
+ }
+
+ override def postStop(): Unit = {
+ executorHandler.destroy()
+ }
+
+ // The folders are under ${GEARPUMP_HOME}
+ val daemonPathPattern = List("lib" + File.separator + "yarn")
+
+ override def receive: Receive = {
+ case ShutdownExecutor(appId, executorId, reason: String) =>
+ executorHandler.destroy()
+ sender ! ShutdownExecutorSucceed(appId, executorId)
+ context.stop(self)
+ case ExecutorResult(executorResult) =>
+ executorResult match {
+ case Success(exit) => LOG.info("Executor exit normally with exit value " + exit)
+ case Failure(e) => LOG.error("Executor exit with errors", e)
+ }
+ context.stop(self)
+ }
+
+ private def getFormatedTime(timestamp: Long): String = {
+ val datePattern = "yyyy-MM-dd-HH-mm"
+ val format = new java.text.SimpleDateFormat(datePattern)
+ format.format(timestamp)
+ }
+
+ private def filterOutDaemonLib(classPath: Array[String]): Array[String] = {
+ classPath.filterNot(matchDaemonPattern(_))
+ }
+
+ private def matchDaemonPattern(path: String): Boolean = {
+ daemonPathPattern.exists(path.contains(_))
+ }
+ }
+
+ trait ExecutorHandler {
+ def destroy(): Unit
+ def exitValue: Future[Int]
+ }
+
+ case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String)
+
+ /**
+ * Starts the executor in the same JVM as worker.
+ */
+ class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int])
+ extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) {
+ private val exitCode = 0
+
+ override val supervisorStrategy =
+ OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
+ case ex: Throwable =>
+ LOG.error(s"system $name stopped ", ex)
+ exit.failure(ex)
+ Stop
+ }
+
+ override def postStop(): Unit = {
+ if (!exit.isCompleted) {
+ exit.success(exitCode)
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
new file mode 100644
index 0000000..0a22245
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import akka.pattern.ask
+import akka.testkit.TestActorRef
+import com.typesafe.config.ConfigValueFactory
+import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
+import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
+import org.apache.gearpump.cluster.master.Master
+import org.apache.gearpump.cluster.worker.Worker
+import org.apache.gearpump.util.Constants
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+class MiniCluster {
+ private val mockMasterIP = "127.0.0.1"
+
+ implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG.
+ withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP)))
+
+ val (mockMaster, worker) = {
+ val master = system.actorOf(Props(classOf[Master]), "master")
+ val worker = system.actorOf(Props(classOf[Worker], master), "worker")
+
+ // Wait until worker register itself to master
+ waitUtilWorkerIsRegistered(master)
+ (master, worker)
+ }
+
+ def launchActor(props: Props): TestActorRef[Actor] = {
+ TestActorRef(props)
+ }
+
+ private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = {
+ while (!isWorkerRegistered(master)) {}
+ }
+
+ private def isWorkerRegistered(master: ActorRef): Boolean = {
+ import scala.concurrent.duration._
+ implicit val dispatcher = system.dispatcher
+
+ implicit val futureTimeout = Constants.FUTURE_TIMEOUT
+
+ val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]]
+
+ // Waits until the worker is registered.
+ val workers = Await.result[WorkerList](workerListFuture, 15.seconds)
+ workers.workers.size > 0
+ }
+
+ def shutDown(): Unit = {
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
new file mode 100644
index 0000000..f9b0762
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.{Actor, ActorRef, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult}
+import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, AppManager}
+import org.apache.gearpump.cluster.master.AppManager._
+import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess}
+import org.apache.gearpump.cluster.{TestUtil, _}
+import org.apache.gearpump.util.LogUtil
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.util.Success
+
+class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+ var kvService: TestProbe = null
+ var haService: TestProbe = null
+ var appLauncher: TestProbe = null
+ var appManager: ActorRef = null
+ private val LOG = LogUtil.getLogger(getClass)
+
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ kvService = TestProbe()(getActorSystem)
+ appLauncher = TestProbe()(getActorSystem)
+
+ appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
+ new DummyAppMasterLauncherFactory(appLauncher))))
+ kvService.expectMsgType[GetKV]
+ kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty)))
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ "AppManager" should "handle AppMaster message correctly" in {
+ val appMaster = TestProbe()(getActorSystem)
+ val appId = 1
+
+ val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName"))
+ appMaster.send(appManager, register)
+ appMaster.expectMsgType[AppMasterRegistered]
+
+ appMaster.send(appManager, ActivateAppMaster(appId))
+ appMaster.expectMsgType[AppMasterActivated]
+ }
+
+ "DataStoreService" should "support Put and Get" in {
+ val appMaster = TestProbe()(getActorSystem)
+ appMaster.send(appManager, SaveAppData(0, "key", 1))
+ kvService.expectMsgType[PutKV]
+ kvService.reply(PutKVSuccess)
+ appMaster.expectMsg(AppDataSaved)
+
+ appMaster.send(appManager, GetAppData(0, "key"))
+ kvService.expectMsgType[GetKV]
+ kvService.reply(GetKVSuccess("key", 1))
+ appMaster.expectMsg(GetAppDataResult("key", 1))
+ }
+
+ "AppManager" should "support application submission and shutdown" in {
+ testClientSubmission(withRecover = false)
+ }
+
+ "AppManager" should "support application submission and recover if appmaster dies" in {
+ LOG.info("=================testing recover==============")
+ testClientSubmission(withRecover = true)
+ }
+
+ "AppManager" should "handle client message correctly" in {
+ val mockClient = TestProbe()(getActorSystem)
+ mockClient.send(appManager, ShutdownApplication(1))
+ assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure)
+
+ mockClient.send(appManager, ResolveAppId(1))
+ assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
+
+ mockClient.send(appManager, AppMasterDataRequest(1))
+ mockClient.expectMsg(AppMasterData(AppMasterNonExist))
+ }
+
+ "AppManager" should "reject the application submission if the app name already existed" in {
+ val app = TestUtil.dummyApp
+ val submit = SubmitApplication(app, None, "username")
+ val client = TestProbe()(getActorSystem)
+ val appMaster = TestProbe()(getActorSystem)
+ val worker = TestProbe()(getActorSystem)
+ val appId = 1
+
+ client.send(appManager, submit)
+
+ kvService.expectMsgType[PutKV]
+ appLauncher.expectMsg(LauncherStarted(appId))
+ appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+ AppMasterRuntimeInfo(appId, app.name)))
+ appMaster.expectMsgType[AppMasterRegistered]
+
+ client.send(appManager, submit)
+ assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure)
+ }
+
+ def testClientSubmission(withRecover: Boolean): Unit = {
+ val app = TestUtil.dummyApp
+ val submit = SubmitApplication(app, None, "username")
+ val client = TestProbe()(getActorSystem)
+ val appMaster = TestProbe()(getActorSystem)
+ val worker = TestProbe()(getActorSystem)
+ val appId = 1
+
+ client.send(appManager, submit)
+
+ kvService.expectMsgType[PutKV]
+ appLauncher.expectMsg(LauncherStarted(appId))
+ appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
+ AppMasterRuntimeInfo(appId, app.name)))
+ kvService.expectMsgType[PutKV]
+ appMaster.expectMsgType[AppMasterRegistered]
+
+ client.send(appManager, ResolveAppId(appId))
+ client.expectMsg(ResolveAppIdResult(Success(appMaster.ref)))
+
+ client.send(appManager, AppMastersDataRequest)
+ client.expectMsgType[AppMastersData]
+
+ client.send(appManager, AppMasterDataRequest(appId, false))
+ client.expectMsgType[AppMasterData]
+
+ if (!withRecover) {
+ client.send(appManager, ShutdownApplication(appId))
+ client.expectMsg(ShutdownApplicationResult(Success(appId)))
+ } else {
+ // Do recovery
+ getActorSystem.stop(appMaster.ref)
+ kvService.expectMsgType[GetKV]
+ val appState = ApplicationState(appId, "application1", 1, app, None, "username", null)
+ kvService.reply(GetKVSuccess(APP_STATE, appState))
+ appLauncher.expectMsg(LauncherStarted(appId))
+ }
+ }
+}
+
+class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory {
+ override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar],
+ username: String, master: ActorRef, client: Option[ActorRef]): Props = {
+ Props(new DummyAppMasterLauncher(test, appId))
+ }
+}
+
+class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor {
+ test.ref ! LauncherStarted(appId)
+
+ override def receive: Receive = {
+ case any: Any => test.ref forward any
+ }
+}
+
+case class LauncherStarted(appId: Int)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
new file mode 100644
index 0000000..d3e739f
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.Props
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.master.InMemoryKVService
+import org.apache.gearpump.cluster.master.InMemoryKVService._
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
+
+import scala.concurrent.duration._
+
+class InMemoryKVServiceSpec
+ extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ override def config: Config = TestUtil.MASTER_CONFIG
+
+ "KVService" should "get, put, delete correctly" in {
+ val system = getActorSystem
+ val kvService = system.actorOf(Props(new InMemoryKVService()))
+ val group = "group"
+
+ val client = TestProbe()(system)
+
+ client.send(kvService, PutKV(group, "key", 1))
+ client.expectMsg(PutKVSuccess)
+
+ client.send(kvService, PutKV(group, "key", 2))
+ client.expectMsg(PutKVSuccess)
+
+ client.send(kvService, GetKV(group, "key"))
+ client.expectMsg(GetKVSuccess("key", 2))
+
+ client.send(kvService, DeleteKVGroup(group))
+
+ // After DeleteGroup, it no longer accept Get and Put message for this group.
+ client.send(kvService, GetKV(group, "key"))
+ client.expectNoMsg(3.seconds)
+
+ client.send(kvService, PutKV(group, "key", 3))
+ client.expectNoMsg(3.seconds)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
new file mode 100644
index 0000000..2166976
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.main
+
+import java.util.Properties
+
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _}
+import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult}
+import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
+import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
+import org.apache.gearpump.cluster.master.MasterProxy
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.transport.HostPort
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{Constants, LogUtil, Util}
+import org.scalatest._
+
+import scala.concurrent.Future
+import scala.util.{Success, Try}
+
+class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ "Worker" should "register worker address to master when started." in {
+
+ val masterReceiver = createMockMaster()
+
+ val tempTestConf = convertTestConf(getHost, getPort)
+
+ val options = Array(
+ s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}",
+ s"-D${PREFER_IPV4}=true"
+ ) ++ getMasterListOption()
+
+ val worker = Util.startProcess(options,
+ getContextClassPath,
+ getMainClassName(Worker),
+ Array.empty)
+
+ try {
+ masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker)
+
+ tempTestConf.delete()
+ } finally {
+ worker.destroy()
+ }
+ }
+
+ "Master" should "accept worker RegisterNewWorker when started" in {
+ val worker = TestProbe()(getActorSystem)
+
+ val host = "127.0.0.1"
+ val port = Util.findFreePort().get
+
+ val properties = new Properties()
+ properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port")
+ properties.put(s"${GEARPUMP_HOSTNAME}", s"$host")
+ val masterConfig = ConfigFactory.parseProperties(properties)
+ .withFallback(TestUtil.MASTER_CONFIG)
+ Future {
+ Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString))
+ }
+
+ val masterProxy = getActorSystem.actorOf(
+ MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec")
+
+ worker.send(masterProxy, RegisterNewWorker)
+ worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME)
+ }
+
+ "Info" should "be started without exception" in {
+
+ val masterReceiver = createMockMaster()
+
+ Future {
+ org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty)
+ }
+
+ masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
+ masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName"))))
+ }
+
+ "Kill" should "be started without exception" in {
+
+ val masterReceiver = createMockMaster()
+
+ Future {
+ Kill.main(masterConfig, Array("-appid", "0"))
+ }
+
+ masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0))
+ masterReceiver.reply(ShutdownApplicationResult(Success(0)))
+ }
+
+ "Replay" should "be started without exception" in {
+
+ val masterReceiver = createMockMaster()
+
+ Future {
+ Replay.main(masterConfig, Array("-appid", "0"))
+ }
+
+ masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME)
+ masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref)))
+ masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME)
+ masterReceiver.reply(ReplayApplicationResult(Success(0)))
+ }
+
+ "Local" should "be started without exception" in {
+ val port = Util.findFreePort().get
+ val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port",
+ s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost",
+ s"-D${PREFER_IPV4}=true")
+
+ val local = Util.startProcess(options,
+ getContextClassPath,
+ getMainClassName(Local),
+ Array.empty)
+
+ def retry(times: Int)(fn: => Boolean): Boolean = {
+
+ LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..")
+
+ val result = fn
+ if (result || times <= 0) {
+ result
+ } else {
+ Thread.sleep(1000)
+ retry(times - 1)(fn)
+ }
+ }
+
+ try {
+ assert(retry(10)(isPortUsed("127.0.0.1", port)),
+ "local is not started successfully, as port is not used " + port)
+ } finally {
+ local.destroy()
+ }
+ }
+
+ "Gear" should "support app|info|kill|shell|replay" in {
+
+ val commands = Array("app", "info", "kill", "shell", "replay")
+
+ assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw")
+
+ for (command <- commands) {
+ assert(Try(Gear.main(Array("-noexist"))).isFailure,
+ "pass unknown option, throw, command: " + command)
+ }
+
+ assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ")
+
+ val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist")))
+ assert(tryThis.isFailure, "unknown command, throw")
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
new file mode 100644
index 0000000..b48fc2a
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.main
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.Config
+import org.apache.gearpump.cluster.TestUtil
+import org.scalatest.{FlatSpec, Matchers}
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class MasterWatcherSpec extends FlatSpec with Matchers {
+ def config: Config = TestUtil.MASTER_CONFIG
+
+ "MasterWatcher" should "kill itself when can not get a quorum" in {
+ val system = ActorSystem("ForMasterWatcher", config)
+
+ val actorWatcher = TestProbe()(system)
+
+ val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher"))
+ actorWatcher watch masterWatcher
+ actorWatcher.expectTerminated(masterWatcher, 5.seconds)
+ system.terminate()
+ Await.result(system.whenTerminated, Duration.Inf)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
new file mode 100644
index 0000000..8a3d7d1
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{ImplicitSender, TestKit, TestProbe}
+import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource
+import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL}
+import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.concurrent.duration._
+
+class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender
+ with WordSpecLike with Matchers with BeforeAndAfterAll{
+
+ def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG))
+ val appId = 0
+ val workerId1: WorkerId = WorkerId(1, 0L)
+ val workerId2: WorkerId = WorkerId(2, 0L)
+ val mockAppMaster = TestProbe()
+ val mockWorker1 = TestProbe()
+ val mockWorker2 = TestProbe()
+
+ override def afterAll {
+ TestKit.shutdownActorSystem(system)
+ }
+
+ "The scheduler" should {
+ "update resource only when the worker is registered" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ scheduler ! ResourceUpdate(mockWorker1.ref, workerId1, Resource(100))
+ expectMsg(UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId1 has not been " +
+ s"registered into master"))
+ }
+
+ "drop application's resource requests when the application is removed" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+ mockAppMaster.expectNoMsg(5.seconds)
+ }
+ }
+
+ def sameElement(left: ResourceAllocated, right: ResourceAllocated): Boolean = {
+ left.allocations.sortBy(_.workerId).sameElements(right.allocations.sortBy(_.workerId))
+ }
+
+ "The resource request with higher priority" should {
+ "be handled first" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, LOW, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, NORMAL, Relaxation.ANY)
+ val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, HIGH, Relaxation.ANY)
+
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+ var expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ }
+ }
+
+ "The resource request which delivered earlier" should {
+ "be handled first if the priorities are the same" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, HIGH, Relaxation.ANY)
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+ var expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ }
+ }
+
+ "The PriorityScheduler" should {
+ "handle the resource request with different relaxation" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ val request1 = ResourceRequest(Resource(40), workerId2, HIGH, Relaxation.SPECIFICWORKER)
+ val request2 = ResourceRequest(Resource(20), workerId1, NORMAL, Relaxation.SPECIFICWORKER)
+
+ scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+
+ var expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ val request3 = ResourceRequest(
+ Resource(30), WorkerId.unspecified, NORMAL, Relaxation.ANY, executorNum = 2)
+ scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+
+ expect = ResourceAllocated(Array(
+ ResourceAllocation(Resource(15), mockWorker1.ref, workerId1),
+ ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+
+ // We have to manually update the resource on each worker
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref)
+ val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), NORMAL, Relaxation.ONEWORKER)
+ scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+
+ expect = ResourceAllocated(
+ Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))
+ mockAppMaster.expectMsgPF(5.seconds) {
+ case request: ResourceAllocated if sameElement(request, expect) => Unit
+ }
+ }
+ }
+
+ "The PriorityScheduler" should {
+ "handle the resource request with different executor number" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+ scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+ // By default, the request requires only one executor
+ val request2 = ResourceRequest(Resource(20), WorkerId.unspecified)
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations2.allocations.length == 1)
+ assert(allocations2.allocations.head.resource == Resource(20))
+
+ val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3)
+ scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+ val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations3.allocations.length == 3)
+ assert(allocations3.allocations.forall(_.resource == Resource(8)))
+
+ // The total available resource can not satisfy the requirements with executor number
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
+ val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3)
+ scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+ val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations4.allocations.length == 2)
+ assert(allocations4.allocations.forall(_.resource == Resource(20)))
+
+ // When new resources are available, the remaining request will be satisfied
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
+ val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations5.allocations.length == 1)
+ assert(allocations4.allocations.forall(_.resource == Resource(20)))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
new file mode 100644
index 0000000..e0233f8
--- /dev/null
+++ b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.cluster.worker
+
+import akka.actor.{ActorSystem, PoisonPill, Props}
+import akka.testkit.TestProbe
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor}
+import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
+import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed}
+import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate}
+import org.apache.gearpump.cluster.master.Master.MasterInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil}
+import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants}
+import org.scalatest._
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness {
+ override def config: Config = TestUtil.DEFAULT_CONFIG
+
+ val appId = 1
+ val workerId: WorkerId = WorkerId(1, 0L)
+ val executorId = 1
+ var masterProxy: TestProbe = null
+ var mockMaster: TestProbe = null
+ var client: TestProbe = null
+ val workerSlots = 50
+
+ override def beforeEach(): Unit = {
+ startActorSystem()
+ mockMaster = TestProbe()(getActorSystem)
+ masterProxy = TestProbe()(getActorSystem)
+ client = TestProbe()(getActorSystem)
+ }
+
+ override def afterEach(): Unit = {
+ shutdownActorSystem()
+ }
+
+ "The new started worker" should {
+ "kill itself if no response from Master after registering" in {
+ val worker = getActorSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+ mockMaster watch worker
+ mockMaster.expectMsg(RegisterNewWorker)
+ mockMaster.expectTerminated(worker, 60.seconds)
+ }
+ }
+
+ "Worker" should {
+ "init its resource from the gearpump config" in {
+ val config = ConfigFactory.parseString(s"${Constants.GEARPUMP_WORKER_SLOTS} = $workerSlots").
+ withFallback(TestUtil.DEFAULT_CONFIG)
+ val workerSystem = ActorSystem("WorkerSystem", config)
+ val worker = workerSystem.actorOf(Props(classOf[Worker], mockMaster.ref))
+ mockMaster watch worker
+ mockMaster.expectMsg(RegisterNewWorker)
+
+ worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(workerSlots)))
+
+ worker.tell(
+ UpdateResourceFailed("Test resource update failed", new Exception()), mockMaster.ref)
+ mockMaster.expectTerminated(worker, 5.seconds)
+ workerSystem.terminate()
+ Await.result(workerSystem.whenTerminated, Duration.Inf)
+ }
+ }
+
+ "Worker" should {
+ "update its remaining resource when launching and shutting down executors" in {
+ val worker = getActorSystem.actorOf(Props(classOf[Worker], masterProxy.ref))
+ masterProxy.expectMsg(RegisterNewWorker)
+
+ worker.tell(WorkerRegistered(workerId, MasterInfo(mockMaster.ref)), mockMaster.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+
+ val executorName = ActorUtil.actorNameForExecutor(appId, executorId)
+ // This is an actor path which the ActorSystemBooter will report back to,
+ // not needed in this test
+ val reportBack = "dummy"
+ val executionContext = ExecutorJVMConfig(Array.empty[String],
+ getActorSystem.settings.config.getString(Constants.GEARPUMP_APPMASTER_ARGS).split(" "),
+ classOf[ActorSystemBooter].getName, Array(executorName, reportBack), None,
+ username = "user")
+
+ // Test LaunchExecutor
+ worker.tell(LaunchExecutor(appId, executorId, Resource(101), executionContext),
+ mockMaster.ref)
+ mockMaster.expectMsg(ExecutorLaunchRejected("There is no free resource on this machine"))
+
+ worker.tell(LaunchExecutor(appId, executorId, Resource(5), executionContext), mockMaster.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(95)))
+
+ worker.tell(ChangeExecutorResource(appId, executorId, Resource(2)), client.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(98)))
+
+ // Test terminationWatch
+ worker.tell(ShutdownExecutor(appId, executorId, "Test shut down executor"), client.ref)
+ mockMaster.expectMsg(ResourceUpdate(worker, workerId, Resource(100)))
+ client.expectMsg(ShutdownExecutorSucceed(1, 1))
+
+ worker.tell(ShutdownExecutor(appId, executorId + 1, "Test shut down executor"), client.ref)
+ client.expectMsg(ShutdownExecutorFailed(
+ s"Can not find executor ${executorId + 1} for app $appId"))
+
+ mockMaster.ref ! PoisonPill
+ masterProxy.expectMsg(RegisterWorker(workerId))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
deleted file mode 100644
index 9e55be6..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.cluster
-
-import akka.actor.ActorRef
-
-import org.apache.gearpump.cluster.master.Master.MasterInfo
-import org.apache.gearpump.cluster.scheduler.Resource
-import org.apache.gearpump.cluster.worker.WorkerId
-
-/**
- * Cluster Bootup Flow
- */
-object WorkerToMaster {
-
- /** When an worker is started, it sends RegisterNewWorker */
- case object RegisterNewWorker
-
- /** When worker lose connection with master, it tries to register itself again with old Id. */
- case class RegisterWorker(workerId: WorkerId)
-
- /** Worker is responsible to broadcast its current status to master */
- case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource)
-}
-
-object MasterToWorker {
-
- /** Master confirm the reception of RegisterNewWorker or RegisterWorker */
- case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo)
-
- /** Worker have not received reply from master */
- case class UpdateResourceFailed(reason: String = null, ex: Throwable = null)
-
- /** Master is synced with worker on resource slots managed by current worker */
- case object UpdateResourceSucceed
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
deleted file mode 100644
index 9bde4d1..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.embedded
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import com.typesafe.config.{Config, ConfigValueFactory}
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.master.{Master => MasterActor}
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER}
-import org.apache.gearpump.util.{LogUtil, Util}
-
-/**
- * Create a in-process cluster with single worker
- */
-class EmbeddedCluster(inputConfig: Config) {
-
- private val workerCount: Int = 1
- private var _master: ActorRef = null
- private var _system: ActorSystem = null
- private var _config: Config = null
-
- private val LOG = LogUtil.getLogger(getClass)
-
- def start(): Unit = {
- val port = Util.findFreePort().get
- val akkaConf = getConfig(inputConfig, port)
- _config = akkaConf
- val system = ActorSystem(MASTER, akkaConf)
-
- val master = system.actorOf(Props[MasterActor], MASTER)
-
- 0.until(workerCount).foreach { id =>
- system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
- }
- this._master = master
- this._system = system
-
- LOG.info("=================================")
- LOG.info("Local Cluster is started at: ")
- LOG.info(s" 127.0.0.1:$port")
- LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
- }
-
- private def getConfig(inputConfig: Config, port: Int): Config = {
- val config = inputConfig.
- withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
- withValue(GEARPUMP_CLUSTER_MASTERS,
- ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
- withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS,
- ConfigValueFactory.fromAnyRef(true)).
- withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
- withValue("akka.actor.provider",
- ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
- config
- }
-
- def newClientContext: ClientContext = {
- ClientContext(_config, _system, _master)
- }
-
- def stop(): Unit = {
- _system.stop(_master)
- _system.terminate()
- Await.result(_system.whenTerminated, Duration.Inf)
- }
-}
-
-object EmbeddedCluster {
- def apply(): EmbeddedCluster = {
- new EmbeddedCluster(ClusterConfig.master())
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
deleted file mode 100644
index db71b7b..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.{Master => MasterActor}
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util}
-
-object Local extends AkkaApp with ArgumentsParser {
- override def akkaConfig: Config = ClusterConfig.master()
-
- var LOG: Logger = LogUtil.getLogger(getClass)
-
- override val options: Array[(String, CLIOption[Any])] =
- Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)),
- "workernum" -> CLIOption[Int]("<how many workers to start>", required = false,
- defaultValue = Some(2)))
-
- override val description = "Start a local cluster"
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
-
- this.LOG = {
- LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL)
- LogUtil.getLogger(getClass)
- }
-
- val config = parse(args)
- if (null != config) {
- local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf)
- }
- }
-
- def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = {
- if (sameProcess) {
- LOG.info("Starting local in same process")
- System.setProperty("LOCAL", "true")
- }
- val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS)
- .asScala.flatMap(Util.parseHostList)
- val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME)
-
- if (masters.size != 1 && masters.head.host != local) {
- LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " +
- s"with ${Constants.GEARPUMP_HOSTNAME}")
- } else {
-
- val hostPort = masters.head
- implicit val system = ActorSystem(MASTER, akkaConf.
- withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port))
- )
-
- val master = system.actorOf(Props[MasterActor], MASTER)
- val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER"
-
- 0.until(workerCount).foreach { id =>
- system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id)
- }
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
deleted file mode 100644
index f1b9bdf..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.collection.immutable
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor._
-import akka.cluster.ClusterEvent._
-import akka.cluster.ddata.DistributedData
-import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings}
-import akka.cluster.{Cluster, Member, MemberStatus}
-import com.typesafe.config.ConfigValueFactory
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode}
-import org.apache.gearpump.cluster.master.Master.MasterListUpdated
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil}
-
-object Master extends AkkaApp with ArgumentsParser {
-
- private var LOG: Logger = LogUtil.getLogger(getClass)
-
- override def akkaConfig: Config = ClusterConfig.master()
-
- override val options: Array[(String, CLIOption[Any])] =
- Array("ip" -> CLIOption[String]("<master ip address>", required = true),
- "port" -> CLIOption("<master port>", required = true))
-
- override val description = "Start Master daemon"
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
-
- this.LOG = {
- LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER)
- LogUtil.getLogger(getClass)
- }
-
- val config = parse(args)
- master(config.getString("ip"), config.getInt("port"), akkaConf)
- }
-
- private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = {
- masters.exists { hostPort =>
- hostPort == s"$master:$port"
- }
- }
-
- private def master(ip: String, port: Int, akkaConf: Config): Unit = {
- val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
-
- if (!verifyMaster(ip, port, masters)) {
- LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " +
- s"gearpump.cluster.masters: ${masters.mkString(", ")}")
- System.exit(-1)
- }
-
- val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava
- val quorum = masterList.size() / 2 + 1
- val masterConfig = akkaConf.
- withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)).
- withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)).
- withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)).
- withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members",
- ConfigValueFactory.fromAnyRef(quorum))
-
- LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}")
- val system = ActorSystem(MASTER, masterConfig)
-
- val replicator = DistributedData(system).replicator
- LOG.info(s"Replicator path: ${replicator.path}")
-
- // Starts singleton manager
- val singletonManager = system.actorOf(ClusterSingletonManager.props(
- singletonProps = Props(classOf[MasterWatcher], MASTER),
- terminationMessage = PoisonPill,
- settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER)
- .withRole(MASTER)),
- name = SINGLETON_MANAGER)
-
- // Start master proxy
- val masterProxy = system.actorOf(ClusterSingletonProxy.props(
- singletonManagerPath = s"/user/${SINGLETON_MANAGER}",
- // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}".
- // Master is created when there is a majority of machines started.
- settings = ClusterSingletonProxySettings(system)
- .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)),
- name = MASTER
- )
-
- LOG.info(s"master proxy is started at ${masterProxy.path}")
-
- val mainThread = Thread.currentThread()
- Runtime.getRuntime().addShutdownHook(new Thread() {
- override def run(): Unit = {
- if (!system.whenTerminated.isCompleted) {
- LOG.info("Triggering shutdown hook....")
-
- system.stop(masterProxy)
- val cluster = Cluster(system)
- cluster.leave(cluster.selfAddress)
- cluster.down(cluster.selfAddress)
- try {
- Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
- } catch {
- case ex: Exception => // Ignore
- }
- system.terminate()
- mainThread.join()
- }
- }
- })
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
-
-class MasterWatcher(role: String) extends Actor with ActorLogging {
- import context.dispatcher
-
- val cluster = Cluster(context.system)
-
- val config = context.system.settings.config
- val masters = config.getList("akka.cluster.seed-nodes")
- val quorum = masters.size() / 2 + 1
-
- val system = context.system
-
- // Sorts by age, oldest first
- val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) }
- var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
-
- def receive: Receive = null
-
- // Subscribes to MemberEvent, re-subscribe when restart
- override def preStart(): Unit = {
- cluster.subscribe(self, classOf[MemberEvent])
- context.become(waitForInit)
- }
- override def postStop(): Unit = {
- cluster.unsubscribe(self)
- }
-
- def matchingRole(member: Member): Boolean = member.hasRole(role)
-
- def waitForInit: Receive = {
- case state: CurrentClusterState => {
- membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m =>
- m.status == MemberStatus.Up && matchingRole(m))
-
- if (membersByAge.size < quorum) {
- membersByAge.iterator.mkString(",")
- log.info(s"We cannot get a quorum, $quorum, " +
- s"shutting down...${membersByAge.iterator.mkString(",")}")
- context.become(waitForShutdown)
- self ! MasterWatcher.Shutdown
- } else {
- val master = context.actorOf(Props(classOf[MasterActor]), MASTER)
- notifyMasterMembersChange(master)
- context.become(waitForClusterEvent(master))
- }
- }
- }
-
- def waitForClusterEvent(master: ActorRef): Receive = {
- case MemberUp(m) if matchingRole(m) => {
- membersByAge += m
- notifyMasterMembersChange(master)
- }
- case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] ||
- mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => {
- log.info(s"member removed ${mEvent.member}")
- val m = mEvent.member
- membersByAge -= m
- if (membersByAge.size < quorum) {
- log.info(s"We cannot get a quorum, $quorum, " +
- s"shutting down...${membersByAge.iterator.mkString(",")}")
- context.become(waitForShutdown)
- self ! MasterWatcher.Shutdown
- } else {
- notifyMasterMembersChange(master)
- }
- }
- }
-
- private def notifyMasterMembersChange(master: ActorRef): Unit = {
- val masters = membersByAge.toList.map{ member =>
- MasterNode(member.address.host.getOrElse("Unknown-Host"),
- member.address.port.getOrElse(0))
- }
- master ! MasterListUpdated(masters)
- }
-
- def waitForShutdown: Receive = {
- case MasterWatcher.Shutdown => {
- cluster.unsubscribe(self)
- cluster.leave(cluster.selfAddress)
- context.stop(self)
- system.scheduler.scheduleOnce(Duration.Zero) {
- try {
- Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS))
- } catch {
- case ex: Exception => // Ignore
- }
- system.terminate()
- }
- }
- }
-}
-
-object MasterWatcher {
- object Shutdown
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
deleted file mode 100644
index 58a9dec..0000000
--- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.main
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import org.slf4j.Logger
-
-import org.apache.gearpump.cluster.ClusterConfig
-import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.transport.HostPort
-import org.apache.gearpump.util.Constants._
-import org.apache.gearpump.cluster.worker.{Worker => WorkerActor}
-import org.apache.gearpump.util.LogUtil.ProcessType
-import org.apache.gearpump.util.{AkkaApp, LogUtil}
-
-/** Tool to start a worker daemon process */
-object Worker extends AkkaApp with ArgumentsParser {
- protected override def akkaConfig = ClusterConfig.worker()
-
- override val description = "Start a worker daemon"
-
- var LOG: Logger = LogUtil.getLogger(getClass)
-
- private def uuid = java.util.UUID.randomUUID.toString
-
- def main(akkaConf: Config, args: Array[String]): Unit = {
- val id = uuid
-
- this.LOG = {
- LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER)
- // Delay creation of LOG instance to avoid creating an empty log file as we
- // reset the log file name here
- LogUtil.getLogger(getClass)
- }
-
- val system = ActorSystem(id, akkaConf)
-
- val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address =>
- val hostAndPort = address.split(":")
- HostPort(hostAndPort(0), hostAndPort(1).toInt)
- }
-
- LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...")
- val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}")
-
- system.actorOf(Props(classOf[WorkerActor], masterProxy),
- classOf[WorkerActor].getSimpleName + id)
-
- Await.result(system.whenTerminated, Duration.Inf)
- }
-}
\ No newline at end of file