You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:36 UTC
[26/49] incubator-gearpump git commit: GEARPUMP-11, fix code style
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
index a3f2fc5..df85017 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,17 +19,21 @@
package io.gearpump.streaming.appmaster
import java.lang.management.ManagementFactory
+import scala.concurrent.Future
import akka.actor._
+import org.slf4j.Logger
+
import io.gearpump._
import io.gearpump.cluster.ClientToMaster.{GetLastFailure, GetStallingTasks, QueryHistoryMetrics, ShutdownApplication}
import io.gearpump.cluster.MasterToAppMaster.{AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
-import io.gearpump.cluster.MasterToClient.{HistoryMetricsItem, HistoryMetrics, LastFailure}
+import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, LastFailure}
import io.gearpump.cluster._
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.metrics.Metrics.ReportMetrics
import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.ExecutorToAppMaster.{UnRegisterTask, MessageLoss, RegisterExecutor, RegisterTask}
+import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterExecutor, RegisterTask, UnRegisterTask}
import io.gearpump.streaming._
import io.gearpump.streaming.appmaster.AppMaster._
import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor}
@@ -42,30 +46,27 @@ import io.gearpump.streaming.util.ActorPathUtil
import io.gearpump.util.Constants.{APPMASTER_DEFAULT_EXECUTOR_ID, _}
import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
import io.gearpump.util._
-import org.slf4j.Logger
-
-import scala.concurrent.Future
/**
* AppMaster is the head of a streaming application.
*
* It contains:
- * 1. ExecutorManager to manage all executors.
- * 2. TaskManager to manage all tasks,
- * 3. ClockService to track the global clock for this streaming application.
- * 4. Scheduler to decide which a task should be scheduled to.
- *
+ * 1. ExecutorManager to manage all executors.
+ * 2. TaskManager to manage all tasks,
+ * 3. ClockService to track the global clock for this streaming application.
+ * 4. Scheduler to decide which a task should be scheduled to.
*/
-class AppMaster(appContext : AppMasterContext, app : AppDescription) extends ApplicationMaster {
+class AppMaster(appContext: AppMasterContext, app: AppDescription) extends ApplicationMaster {
import app.userConfig
import appContext.{appId, masterProxy, username}
- implicit val actorSystem = context.system
- implicit val timeOut = FUTURE_TIMEOUT
+ private implicit val actorSystem = context.system
+ private implicit val timeOut = FUTURE_TIMEOUT
+
import akka.pattern.ask
- implicit val dispatcher = context.dispatcher
+ private implicit val dispatcher = context.dispatcher
- val startTime: TimeStamp = System.currentTimeMillis()
+ private val startTime: TimeStamp = System.currentTimeMillis()
private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx")
@@ -83,11 +84,12 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
private var lastFailure = LastFailure(0L, null)
private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID,
- self.path.toString, Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active")
+ self.path.toString,
+ Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active")
private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig)
- val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+ private val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
private val userDir = System.getProperty("user.dir")
private val logFile = LogUtil.applicationLogDir(actorSystem.settings.config)
@@ -104,12 +106,15 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
)
private val historyMetricsService = if (metricsEnabled) {
- // register jvm metrics
- Metrics(context.system).register(new JvmMetricsSet(s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}"))
+ // Registers jvm metrics
+ Metrics(context.system).register(new JvmMetricsSet(
+ s"app${appId}.executor${APPMASTER_DEFAULT_EXECUTOR_ID}"))
- val historyMetricsService = context.actorOf(Props(new HistoryMetricsService(s"app$appId", getHistoryMetricsConfig)))
+ val historyMetricsService = context.actorOf(Props(new HistoryMetricsService(
+ s"app$appId", getHistoryMetricsConfig)))
- val metricsReportService = context.actorOf(Props(new MetricsReporterService(Metrics(context.system))))
+ val metricsReportService = context.actorOf(Props(
+ new MetricsReporterService(Metrics(context.system))))
historyMetricsService.tell(ReportMetrics, metricsReportService)
Some(historyMetricsService)
@@ -129,13 +134,15 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
jarScheduler, executorManager, clockService.get, self, app.name))))
}
- override def receive : Receive =
- taskMessageHandler orElse
+ override def receive: Receive = {
+ taskMessageHandler orElse
executorMessageHandler orElse
recover orElse
appMasterService orElse
ActorUtil.defaultMsgHandler(self)
+ }
+ /** Handles messages from Tasks */
def taskMessageHandler: Receive = {
case clock: ClockEvent =>
taskManager.foreach(_ forward clock)
@@ -143,7 +150,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
taskManager.foreach(_ forward register)
case unRegister: UnRegisterTask =>
taskManager.foreach(_ forward unRegister)
- // check whether this processor dead, if it is, then we should remove it from clockService.
+ // Checks whether this processor dead, if it is, then we should remove it from clockService.
clockService.foreach(_ forward CheckProcessorDeath(unRegister.taskId.processorId))
case replay: ReplayFromTimestampWindowTrailingEdge =>
taskManager.foreach(_ forward replay)
@@ -163,6 +170,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
clockService.foreach(_ forward GetCheckpointClock)
}
+ /** Handles messages from Executors */
def executorMessageHandler: Receive = {
case register: RegisterExecutor =>
executorManager forward register
@@ -170,6 +178,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
historyMetricsService.foreach(_ forward ReportMetrics)
}
+ /** Handles messages from AppMaster */
def appMasterService: Receive = {
case appMasterDataDetailRequest: AppMasterDataDetailRequest =>
LOG.debug(s"AppMaster got AppMasterDataDetailRequest for $appId ")
@@ -179,14 +188,17 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
val taskFuture = getTaskList
val dagFuture = getDAG
- val appMasterDataDetail = for {executors <- executorsFuture
+ val appMasterDataDetail = for {
+ executors <- executorsFuture
clock <- clockFuture
tasks <- taskFuture
dag <- dagFuture
} yield {
val graph = dag.graph
- val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {_.keys.toList}
+ val executorToTasks = tasks.tasks.groupBy(_._2).mapValues {
+ _.keys.toList
+ }
val processors = dag.processors.map { kv =>
val processor = kv._2
@@ -195,7 +207,8 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
(kv._1, TaskCount(kv._2.count(_.processorId == id)))
}.filter(_._2.count != 0)
(id,
- ProcessorSummary(id, taskClass, parallelism, description, taskConf, life, tasks.keys.toList, tasks))
+ ProcessorSummary(id, taskClass, parallelism, description, taskConf, life,
+ tasks.keys.toList, tasks))
}
StreamAppMasterSummary(
@@ -211,7 +224,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
logFile = logFile.getAbsolutePath,
processors = processors,
processorLevels = graph.vertexHierarchyLevelMap(),
- dag = graph.mapEdge {(node1, edge, node2) =>
+ dag = graph.mapEdge { (node1, edge, node2) =>
edge.partitionerFactory.name
},
executors = executors,
@@ -221,16 +234,16 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
val client = sender()
- appMasterDataDetail.map{appData =>
+ appMasterDataDetail.map { appData =>
client ! appData
}
-// TODO: WebSocket is buggy and disabled.
-// case appMasterMetricsRequest: AppMasterMetricsRequest =>
-// val client = sender()
-// actorSystem.eventStream.subscribe(client, classOf[MetricType])
+ // TODO: WebSocket is buggy and disabled.
+ // case appMasterMetricsRequest: AppMasterMetricsRequest =>
+ // val client = sender()
+ // actorSystem.eventStream.subscribe(client, classOf[MetricType])
case query: QueryHistoryMetrics =>
if (historyMetricsService.isEmpty) {
- // return empty metrics so that we don't hang the UI
+ // Returns empty metrics so that we don't hang the UI
sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem])
} else {
historyMetricsService.get forward query
@@ -241,34 +254,37 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
dagManager forward replaceDAG
case GetLastFailure(_) =>
sender ! lastFailure
- case get@ GetExecutorSummary(executorId) =>
+ case get@GetExecutorSummary(executorId) =>
val client = sender
if (executorId == APPMASTER_DEFAULT_EXECUTOR_ID) {
client ! appMasterExecutorSummary
} else {
- ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo).map { map =>
- map.get(executorId).foreach { executor =>
- executor.executor.tell(get, client)
+ ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+ .map { map =>
+ map.get(executorId).foreach { executor =>
+ executor.executor.tell(get, client)
+ }
}
- }
}
- case query@ QueryExecutorConfig(executorId) =>
+ case query@QueryExecutorConfig(executorId) =>
val client = sender
if (executorId == -1) {
val systemConfig = context.system.settings.config
sender ! ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
} else {
- ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo).map { map =>
- map.get(executorId).foreach { executor =>
- executor.executor.tell(query, client)
+ ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+ .map { map =>
+ map.get(executorId).foreach { executor =>
+ executor.executor.tell(query, client)
+ }
}
- }
}
- }
+ }
+ /** Error handling */
def recover: Receive = {
case FailedToRecover(errorMsg) =>
- if(context.children.toList.contains(sender())){
+ if (context.children.toList.contains(sender())) {
LOG.error(errorMsg)
masterProxy ! ShutdownApplication(appId)
}
@@ -288,14 +304,15 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
}
private def executorBrief: Future[List[ExecutorBrief]] = {
- ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo).map { infos =>
- infos.values.map { info =>
- ExecutorBrief(info.executorId,
- info.executor.path.toSerializationFormat,
- info.worker.workerId,
- "active")
- }.toList :+ appMasterBrief
- }
+ ActorUtil.askActor[Map[ExecutorId, ExecutorInfo]](executorManager, GetExecutorInfo)
+ .map { infos =>
+ infos.values.map { info =>
+ ExecutorBrief(info.executorId,
+ info.executor.path.toSerializationFormat,
+ info.worker.workerId,
+ "active")
+ }.toList :+ appMasterBrief
+ }
}
private def getTaskList: Future[TaskList] = {
@@ -312,10 +329,11 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
}
private def getUpdatedDAG(): DAG = {
- val dag = DAG(userConfig.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get)
- val updated = dag.processors.map{ idAndProcessor =>
+ val dag = DAG(userConfig.getValue[Graph[ProcessorDescription,
+ PartitionerDescription]](StreamApplication.DAG).get)
+ val updated = dag.processors.map { idAndProcessor =>
val (id, oldProcessor) = idAndProcessor
- val newProcessor = if(oldProcessor.jar == null) {
+ val newProcessor = if (oldProcessor.jar == null) {
oldProcessor.copy(jar = appContext.appJar.getOrElse(null))
} else {
oldProcessor
@@ -327,14 +345,18 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap
}
object AppMaster {
+
+ /** Master node doesn't return resource in time */
case object AllocateResourceTimeOut
+ /** Query task ActorRef by providing the taskId */
case class LookupTaskActorRef(taskId: TaskId)
case class TaskActorRef(task: ActorRef)
class ServiceNotAvailableException(reason: String) extends Exception(reason)
- case class ExecutorBrief(executorId: ExecutorId, executor: String, workerId: WorkerId, status: String)
+ case class ExecutorBrief(
+ executorId: ExecutorId, executor: String, workerId: WorkerId, status: String)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
index 67fd592..bd18bdc 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,45 +21,47 @@ package io.gearpump.streaming.appmaster
import java.util
import java.util.Date
import java.util.concurrent.TimeUnit
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.language.implicitConversions
import akka.actor.{Actor, Cancellable, Stash}
+import org.slf4j.Logger
+
+import io.gearpump.TimeStamp
+import io.gearpump.cluster.ClientToMaster.GetStallingTasks
import io.gearpump.google.common.primitives.Longs
-import io.gearpump.streaming.task._
+import io.gearpump.streaming.AppMasterToMaster.StallingTasks
import io.gearpump.streaming._
+import io.gearpump.streaming.appmaster.ClockService.HealthChecker.ClockValue
+import io.gearpump.streaming.appmaster.ClockService._
import io.gearpump.streaming.storage.AppDataStore
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.ClientToMaster.GetStallingTasks
-import AppMasterToMaster.StallingTasks
-import ClockService.HealthChecker.ClockValue
-import ClockService._
+import io.gearpump.streaming.task._
import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-import scala.language.implicitConversions
/**
- * The clockService will maintain a global view of message timestamp in the application
+ * Maintains a global view of message timestamp in the application
*/
-class ClockService(private var dag : DAG, store: AppDataStore) extends Actor with Stash {
+class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with Stash {
private val LOG: Logger = LogUtil.getLogger(getClass)
import context.dispatcher
private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60)
- private var healthCheckScheduler : Cancellable = null
- private var snapshotScheduler : Cancellable = null
+ private var healthCheckScheduler: Cancellable = null
+ private var snapshotScheduler: Cancellable = null
- override def receive = null
+ override def receive: Receive = null
- override def preStart() : Unit = {
+ override def preStart(): Unit = {
LOG.info("Initializing Clock service, get snapshotted StartClock ....")
store.get(START_CLOCK).asInstanceOf[Future[TimeStamp]].map { clock =>
val startClock = Option(clock).getOrElse(0L)
minCheckpointClock = Some(startClock)
+ // Recover the application by restarting from last persisted startClock.
+ // Only messge after startClock will be replayed.
self ! StoredStartClock(startClock)
LOG.info(s"Start Clock Retrieved, starting ClockService, startClock: $startClock")
}
@@ -67,12 +69,15 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
context.become(waitForStartClock)
}
- override def postStop() : Unit = {
+ override def postStop(): Unit = {
Option(healthCheckScheduler).map(_.cancel)
Option(snapshotScheduler).map(_.cancel)
}
- var clocks = Map.empty[ProcessorId, ProcessorClock]
+ // Keep track of clock value of all processors.
+ private var clocks = Map.empty[ProcessorId, ProcessorClock]
+
+ // Each process can have multiple upstream processors. This keep track of the upstream clocks.
private var upstreamClocks = Map.empty[ProcessorId, Array[ProcessorClock]]
// We use Array instead of List for Performance consideration
@@ -106,12 +111,12 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = {
this.clocks = dag.processors.filter(startClock < _._2.life.death).
map { pair =>
- val (processorId, processor) = pair
- val parallelism = processor.parallelism
- val clock = new ProcessorClock(processorId, processor.life, parallelism)
- clock.init(startClock)
- (processorId, clock)
- }
+ val (processorId, processor) = pair
+ val parallelism = processor.parallelism
+ val clock = new ProcessorClock(processorId, processor.life, parallelism)
+ clock.init(startClock)
+ (processorId, clock)
+ }
this.upstreamClocks = clocks.map { pair =>
val (processorId, processor) = pair
@@ -150,7 +155,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
(processorId, upstreamClocks.toArray)
}
- // init the clock of all processors.
+ // Inits the clock of all processors.
newClocks.map { pair =>
val (processorId, processorClock) = pair
val upstreamClock = getUpStreamMinClock(processorId)
@@ -174,11 +179,12 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
import context.dispatcher
- //period report current clock
- healthCheckScheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
+ // Period report current clock
+ healthCheckScheduler = context.system.scheduler.schedule(
+ new FiniteDuration(5, TimeUnit.SECONDS),
new FiniteDuration(60, TimeUnit.SECONDS), self, HealthCheck)
- //period snpashot latest min startclock to external storage
+ // Period snpashot latest min startclock to external storage
snapshotScheduler = context.system.scheduler.schedule(new FiniteDuration(5, TimeUnit.SECONDS),
new FiniteDuration(5, TimeUnit.SECONDS), self, SnapshotStartClock)
@@ -206,7 +212,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
case GetUpstreamMinClock(task) =>
sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId))
- case update@ UpdateClock(task, clock) =>
+ case update@UpdateClock(task, clock) =>
val upstreamMinClock = getUpStreamMinClock(task.processorId)
val processorClock = clocks.get(task.processorId)
@@ -233,7 +239,8 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
LOG.info(s"Removing $processorId from clock service...")
removeProcessor(processorId)
} else {
- LOG.info(s"Unsuccessfully in removing $processorId from clock service..., min: ${processorClock.get.min}, life: $life")
+ LOG.info(s"Unsuccessfully in removing $processorId from clock service...," +
+ s" min: ${processorClock.get.min}, life: $life")
}
}
case HealthCheck =>
@@ -253,15 +260,15 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
case ChangeToNewDAG(dag) =>
if (dag.version > this.dag.version) {
- // transit to a new dag version
+ // Transits to a new dag version
this.dag = dag
dynamicDAG(dag, getStartClock)
} else {
- // restart current dag.
+ // Restarts current dag.
recoverDag(dag, getStartClock)
}
- LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess")
- sender ! ChangeToNewDAGSuccess(clocks.map{ pair =>
+ LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess")
+ sender ! ChangeToNewDAGSuccess(clocks.map { pair =>
val (id, clock) = pair
(id, clock.min)
})
@@ -279,7 +286,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
upstreamClocks = upstreamClocks - processorId
- // remove dead processor from checkpoints.
+ // Removes dead processor from checkpoints.
checkpointClocks = checkpointClocks.filter { kv =>
val (taskId, processor) = kv
taskId.processorId != processorId
@@ -290,12 +297,13 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
ProcessorClocks.minClock(processorClocks)
}
- def selfCheck() : Unit = {
+ def selfCheck(): Unit = {
val minTimestamp = minClock
if (Long.MaxValue == minTimestamp) {
processorClocks.foreach { clock =>
- LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, taskClocks: "+ clock.taskClocks.mkString(","))
+ LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, " +
+ s"taskClocks: " + clock.taskClocks.mkString(","))
}
}
@@ -306,7 +314,7 @@ class ClockService(private var dag : DAG, store: AppDataStore) extends Actor wit
minCheckpointClock.getOrElse(minClock)
}
- private def snapshotStartClock() : Unit = {
+ private def snapshotStartClock(): Unit = {
store.put(START_CLOCK, getStartClock)
}
@@ -329,10 +337,11 @@ object ClockService {
case object HealthCheck
class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallism: Int,
- private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) {
-
+ private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) {
- def copy(life: LifeTime): ProcessorClock = new ProcessorClock(processorId, life, parallism, _min, _taskClocks)
+ def copy(life: LifeTime): ProcessorClock = {
+ new ProcessorClock(processorId, life, parallism, _min, _taskClocks)
+ }
def min: TimeStamp = _min
def taskClocks: Array[TimeStamp] = _taskClocks
@@ -362,17 +371,22 @@ object ClockService {
private val LOG: Logger = LogUtil.getLogger(getClass)
private var minClock: ClockValue = null
- private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000 // 60 seconds
+ private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000
+ // 60 seconds
private var stallingTasks = Array.empty[TaskId]
- def check(currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock], dag: DAG, now: TimeStamp): Unit = {
+ /** Check for stalling tasks */
+ def check(
+ currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock],
+ dag: DAG, now: TimeStamp): Unit = {
var isClockStalling = false
if (null == minClock || currentMinClock > minClock.appClock) {
minClock = ClockValue(systemClock = now, appClock = currentMinClock)
} else {
- //clock not advancing
+ // Clock not advancing
if (now > minClock.systemClock + stallingThresholdMilliseconds) {
- LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock)/1000} seconds since ${minClock.prettyPrint}...")
+ LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock) / 1000} seconds " +
+ s"since ${minClock.prettyPrint}...")
isClockStalling = true
}
}
@@ -387,7 +401,7 @@ object ClockService {
}
}
- processorId.foreach {processorId =>
+ processorId.foreach { processorId =>
val processorClock = processorClocks(processorId)
val taskClocks = processorClock.taskClocks
stallingTasks = taskClocks.zipWithIndex.filter(_._1 == minClock.appClock).
@@ -414,10 +428,11 @@ object ClockService {
object ProcessorClocks {
+ // Get the Min clock of all processors
def minClock(clock: Array[ProcessorClock]): TimeStamp = {
var i = 0
var min = if (clock.length == 0) 0L else clock(0).min
- while(i < clock.length) {
+ while (i < clock.length) {
min = Math.min(min, clock(i).min)
i += 1
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
index 7244f95..f18b387 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
@@ -18,44 +18,48 @@
package io.gearpump.streaming.appmaster
-import akka.actor.{ActorRef, Actor, Stash}
-import io.gearpump.streaming._
-import io.gearpump.streaming.task.Subscriber
-import io.gearpump.streaming.storage.AppDataStore
-import io.gearpump.cluster.UserConfig
-import io.gearpump.partitioner.PartitionerDescription
-import DagManager._
-import io.gearpump.util.{LogUtil, Graph}
+import scala.concurrent.Future
+
+import akka.actor.{Actor, ActorRef, Stash}
import org.slf4j.Logger
-import scala.concurrent.Future
+import io.gearpump.cluster.UserConfig
+import io.gearpump.partitioner.PartitionerDescription
+import io.gearpump.streaming._
+import io.gearpump.streaming.appmaster.DagManager._
+import io.gearpump.streaming.storage.AppDataStore
+import io.gearpump.streaming.task.Subscriber
+import io.gearpump.util.{Graph, LogUtil}
/**
- * Will handle dag modification and other stuff related with DAG
- */
-
+ * Handles dag modification and other stuff related with DAG
+ *
+ * DagManager maintains multiple version of DAGs. For each version, the DAG is immutable.
+ * For operations like modifying a processor, it will create a new version of DAG.
+ */
class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: Option[DAG])
- extends Actor with Stash {
+ extends Actor with Stash {
+
import context.dispatcher
private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
private val NOT_INITIALIZED = -1
private var dags = List.empty[DAG]
private var maxProcessorId = -1
- implicit val system = context.system
+ private implicit val system = context.system
private var watchers = List.empty[ActorRef]
override def receive: Receive = null
- override def preStart() : Unit = {
+ override def preStart(): Unit = {
LOG.info("Initializing Dag Service, get stored Dag ....")
store.get(StreamApplication.DAG).asInstanceOf[Future[DAG]].map { storedDag =>
if (storedDag != null) {
dags :+= storedDag
} else {
dags :+= dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription,
- PartitionerDescription]](StreamApplication.DAG).get))
+ PartitionerDescription]](StreamApplication.DAG).get))
}
maxProcessorId = {
val keys = dags.head.processors.keys
@@ -91,20 +95,25 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O
def dagService: Receive = {
case GetLatestDAG =>
+ // Get the latest version of DAG.
sender ! LatestDAG(dags.last)
case GetTaskLaunchData(version, processorId, context) =>
+ // Task information like Processor class, downstream subscriber processors and etc.
dags.find(_.version == version).foreach { dag =>
LOG.info(s"Get task launcher data for processor: $processorId, dagVersion: $version")
sender ! taskLaunchData(dag, processorId, context)
}
case ReplaceProcessor(oldProcessorId, inputNewProcessor) =>
+ // Replace a processor with new implementation. The upstream processors and downstream
+ // processors are NOT changed.
var newProcessor = inputNewProcessor.copy(id = nextProcessorId)
- if (inputNewProcessor.jar == null){
+ if (inputNewProcessor.jar == null) {
val oldJar = dags.last.processors.get(oldProcessorId).get
newProcessor = newProcessor.copy(jar = oldJar.jar)
}
if (dags.length > 1) {
- sender ! DAGOperationFailed("We are in the process of handling previous dynamic dag change")
+ sender ! DAGOperationFailed(
+ "We are in the process of handling previous dynamic dag change")
} else {
val oldDAG = dags.last
val newVersion = oldDAG.version + 1
@@ -118,21 +127,25 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O
}
case WatchChange(watcher) =>
+ // Checks whether there are modifications for this DAG.
if (!this.watchers.contains(watcher)) {
this.watchers :+= watcher
}
case NewDAGDeployed(dagVersion) =>
- // means the new DAG version has been successfully deployed.
- // remove obsolete versions.
+ // Means dynamic Dag transition completed, and the new DAG version has been successfully
+ // deployed. The obsolete dag versions will be removed.
if (dagVersion != NOT_INITIALIZED) {
dags = dags.filter(_.version == dagVersion)
store.put(StreamApplication.DAG, dags.last)
}
}
- private def replaceDAG(dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int): DAG = {
- val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth, newProcessor.life.birth)
+ private def replaceDAG(
+ dag: DAG, oldProcessorId: ProcessorId, newProcessor: ProcessorDescription, newVersion: Int)
+ : DAG = {
+ val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
+ newProcessor.life.birth)
val newProcessorMap = dag.processors ++
Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = oldProcessorLife),
@@ -153,11 +166,13 @@ object DagManager {
case class LatestDAG(dag: DAG)
case class GetTaskLaunchData(dagVersion: Int, processorId: Int, context: AnyRef = null)
- case class TaskLaunchData(processorDescription : ProcessorDescription, subscribers: List[Subscriber], context: AnyRef = null)
+ case class TaskLaunchData(processorDescription : ProcessorDescription,
+ subscribers: List[Subscriber], context: AnyRef = null)
sealed trait DAGOperation
- case class ReplaceProcessor(oldProcessorId: ProcessorId, newProcessorDescription: ProcessorDescription) extends DAGOperation
+ case class ReplaceProcessor(oldProcessorId: ProcessorId,
+ newProcessorDescription: ProcessorDescription) extends DAGOperation
sealed trait DAGOperationResult
case object DAGOperationSuccess extends DAGOperationResult
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
index ebe6c11..ab99af5 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,34 +18,32 @@
package io.gearpump.streaming.appmaster
+import scala.concurrent.duration._
+import scala.util.{Failure, Try}
+
import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import akka.remote.RemoteScope
import com.typesafe.config.Config
-import io.gearpump.WorkerId
+import org.apache.commons.lang.exception.ExceptionUtils
+
import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
import io.gearpump.cluster.appmaster.WorkerInfo
import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.cluster.{AppJar, AppMasterContext, ExecutorContext, UserConfig}
import io.gearpump.streaming.ExecutorId
import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
import io.gearpump.streaming.appmaster.ExecutorManager._
import io.gearpump.streaming.executor.Executor
import io.gearpump.util.{LogUtil, Util}
-import org.apache.commons.lang.exception.ExceptionUtils
-
-import scala.concurrent.duration._
-import scala.util.Try
/**
- * ExecutorManager manage all executors.
+ * ExecutorManager manage the start and stop of all executors.
*
* ExecutorManager will launch Executor when asked. It hide the details like starting
- * a new ExecutorSystem from user.
- *
- * Please use ExecutorManager.props() to construct this actor
- *
+ * a new ExecutorSystem from user. Please use ExecutorManager.props() to construct this actor
*/
private[appmaster] class ExecutorManager(
userConfig: UserConfig,
@@ -60,10 +58,11 @@ private[appmaster] class ExecutorManager(
import appContext.{appId, masterProxy, username}
private var taskManager: ActorRef = null
- implicit val actorSystem = context.system
+
+ private implicit val actorSystem = context.system
private val systemConfig = context.system.settings.config
- private var executors = Map.empty[Int, ExecutorInfo]
+ private var executors = Map.empty[Int, ExecutorInfo]
def receive: Receive = waitForTaskManager
@@ -73,18 +72,26 @@ private[appmaster] class ExecutorManager(
context.become(service orElse terminationWatch)
}
- override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
+ // If something wrong on executor, ExecutorManager will stop the current executor,
+ // and wait for AppMaster to start a new executor.
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10,
+ withinTimeRange = 1.minute) {
case ex: Throwable =>
val executorId = Try(sender.path.name.toInt)
executorId match {
case scala.util.Success(id) => {
executors -= id
- LOG.error(s"Executor $id throws exception, stop it...\n" + ExceptionUtils.getStackTrace(ex))
+ LOG.error(s"Executor $id throws exception, stop it...\n" +
+ ExceptionUtils.getStackTrace(ex))
+ }
+ case Failure(ex) => {
+ LOG.error(s"Sender ${sender.path} is dead, but seems it is not an executor...")
}
}
Stop
}
+ // Responds to outside queries
def service: Receive = {
case StartExecutors(resources, jar) =>
masterProxy ! StartExecutorSystems(resources, getExecutorJvmConfig(Some(jar)))
@@ -92,30 +99,31 @@ private[appmaster] class ExecutorManager(
import executorSystem.{address, executorSystemId, resource => executorResource, worker}
val executorId = executorSystemId
- val executorContext = ExecutorContext(executorId, worker, appId, appName, appMaster = context.parent, executorResource)
+ val executorContext = ExecutorContext(executorId, worker, appId, appName,
+ appMaster = context.parent, executorResource)
executors += executorId -> ExecutorInfo(executorId, null, worker, boundedJar)
- //start executor
- val executor = context.actorOf(executorFactory(executorContext, userConfig, address, executorId),
- executorId.toString)
+ // Starts executor
+ val executor = context.actorOf(executorFactory(executorContext, userConfig,
+ address, executorId), executorId.toString)
executorSystem.bindLifeCycleWith(executor)
case StartExecutorSystemTimeout =>
taskManager ! StartExecutorsTimeOut
case RegisterExecutor(executor, executorId, resource, worker) =>
LOG.info(s"executor $executorId has been launched")
- //watch for executor termination
+ // Watches for executor termination
context.watch(executor)
val executorInfo = executors.get(executorId).get
executors += executorId -> executorInfo.copy(executor = executor)
taskManager ! ExecutorStarted(executorId, resource, worker.workerId, executorInfo.boundedJar)
- // broadcast message to all executors
+ // Broadcasts message to all executors
case BroadCast(msg) =>
LOG.info(s"Broadcast ${msg.getClass.getSimpleName} to all executors")
- context.children.foreach(_ forward msg)
+ context.children.foreach(_ forward msg)
- // unicast message to single executor
+ // Unicasts message to single executor
case UniCast(executorId, msg) =>
LOG.info(s"Unicast ${msg.getClass.getSimpleName} to executor $executorId")
val executor = executors.get(executorId)
@@ -124,13 +132,14 @@ private[appmaster] class ExecutorManager(
case GetExecutorInfo =>
sender ! executors
- // update resource usage, so that reclaim unused resource.
+ // Tells Executor manager resources that are occupied. The Executor Manager can use this
+ // information to tell worker to reclaim un-used resources
case ExecutorResourceUsageSummary(resources) =>
executors.foreach { pair =>
val (executorId, executor) = pair
val resource = resources.get(executorId)
val worker = executor.worker.ref
- // notify the worker the actual resource used by this application.
+ // Notifies the worker the actual resource used by this application.
resource match {
case Some(resource) =>
worker ! ChangeExecutorResource(appId, executorId, resource)
@@ -138,9 +147,9 @@ private[appmaster] class ExecutorManager(
worker ! ChangeExecutorResource(appId, executorId, Resource(0))
}
}
- }
+ }
- def terminationWatch : Receive = {
+ def terminationWatch: Receive = {
case Terminated(actor) =>
val executorId = Try(actor.path.name.toInt)
executorId match {
@@ -149,14 +158,17 @@ private[appmaster] class ExecutorManager(
LOG.error(s"Executor $id is down")
taskManager ! ExecutorStopped(id)
}
- case scala.util.Failure(ex) => LOG.error(s"failed to get the executor Id from path string ${actor.path}" , ex)
+ case scala.util.Failure(ex) =>
+ LOG.error(s"failed to get the executor Id from path string ${actor.path}", ex)
}
}
private def getExecutorJvmConfig(jar: Option[AppJar]): ExecutorSystemJvmConfig = {
val executorAkkaConfig = clusterConfig
val jvmSetting = Util.resolveJvmSetting(executorAkkaConfig.withFallback(systemConfig)).executor
- ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar, username, executorAkkaConfig)
+
+ ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar,
+ username, executorAkkaConfig)
}
}
@@ -168,26 +180,30 @@ private[appmaster] object ExecutorManager {
case object GetExecutorInfo
- case class ExecutorStarted(executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar])
+ case class ExecutorStarted(
+ executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar])
case class ExecutorStopped(executorId: Int)
case class SetTaskManager(taskManager: ActorRef)
case object StartExecutorsTimeOut
- def props(userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String): Props = {
+ def props(
+ userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: Config, appName: String)
+ : Props = {
val executorFactory =
- (executorContext: ExecutorContext,
- userConfig: UserConfig,
- address: Address,
- executorId: ExecutorId) =>
- Props(classOf[Executor], executorContext, userConfig)
- .withDeploy(Deploy(scope = RemoteScope(address)))
+ (executorContext: ExecutorContext,
+ userConfig: UserConfig,
+ address: Address,
+ executorId: ExecutorId) =>
+ Props(classOf[Executor], executorContext, userConfig)
+ .withDeploy(Deploy(scope = RemoteScope(address)))
Props(new ExecutorManager(userConfig, appContext, executorFactory, clusterConfig, appName))
}
case class ExecutorResourceUsageSummary(resources: Map[ExecutorId, Resource])
- case class ExecutorInfo(executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar])
+ case class ExecutorInfo(
+ executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, boundedJar: Option[AppJar])
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
index e769a0d..a2a9c74 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,51 +17,69 @@
*/
package io.gearpump.streaming.appmaster
+import scala.concurrent.Future
+
import akka.actor._
+import akka.pattern.ask
import com.typesafe.config.Config
-import io.gearpump.{WorkerId, TimeStamp}
-import io.gearpump.streaming.task.{StartClock, TaskId}
-import io.gearpump.streaming.{ProcessorDescription, DAG}
+
+import io.gearpump.TimeStamp
import io.gearpump.cluster.AppJar
import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.partitioner.PartitionerDescription
import io.gearpump.streaming.appmaster.JarScheduler._
-import io.gearpump.util.{LogUtil, Constants, Graph}
-import akka.pattern.ask
-import scala.concurrent.Future
+import io.gearpump.streaming.task.TaskId
+import io.gearpump.streaming.{DAG, ProcessorDescription}
+import io.gearpump.util.{Constants, Graph, LogUtil}
/**
+ * Different processors of the stream application can use different jars. JarScheduler is the
+ * scheduler for different jars.
*
- * With JarScheduler, we allows a DAG to be partitioned into several
- * parts, with each part use its own jar file.
+ * For a DAG of multiple processors, each processor can have its own jar. Tasks of same jar
+ * is scheduled by TaskScheduler, and TaskSchedulers are scheduled by JarScheduler.
*
+ * In runtime, the implementation is delegated to actor JarSchedulerImpl
*/
-class JarScheduler(appId : Int, appName: String, config: Config, factory: ActorRefFactory) {
+class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRefFactory) {
private val actor: ActorRef = factory.actorOf(Props(new JarSchedulerImpl(appId, appName, config)))
private implicit val dispatcher = factory.dispatcher
private implicit val timeout = Constants.FUTURE_TIMEOUT
+ /** Set the current DAG version active */
def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = {
actor ! TransitToNewDag
- startClock.map {start =>
+ startClock.map { start =>
actor ! NewDag(dag, start)
}
}
- def getRequestDetails(): Future[Array[ResourceRequestDetail]] = {
- (actor ? GetRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]]
+ /** AppMaster ask JarScheduler about how many resource it wants */
+ def getResourceRequestDetails(): Future[Array[ResourceRequestDetail]] = {
+ (actor ? GetResourceRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]]
}
- def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource): Future[List[TaskId]] = {
- (actor ? ScheduleTask(appJar, workerId, executorId, resource)).asInstanceOf[Future[List[TaskId]]]
+ /**
+ * AppMaster has resource allocated, and ask the jar scheduler to schedule tasks
+ * for this executor.
+ */
+ def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource)
+ : Future[List[TaskId]] = {
+ (actor ? ScheduleTask(appJar, workerId, executorId, resource))
+ .asInstanceOf[Future[List[TaskId]]]
}
+ /**
+ * Some executor JVM process is dead. AppMaster asks jar scheduler to re-schedule the impacted
+ * tasks.
+ */
def executorFailed(executorId: Int): Future[Option[ResourceRequestDetail]] = {
(actor ? ExecutorFailed(executorId)).asInstanceOf[Future[Option[ResourceRequestDetail]]]
}
}
-object JarScheduler{
+object JarScheduler {
case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest])
@@ -69,14 +87,24 @@ object JarScheduler{
case object TransitToNewDag
- case object GetRequestDetails
+ case object GetResourceRequestDetails
+ /**
+ * Schedule tasks for one appJar.
+ *
+ * @param appJar Application jar.
+ * @param workerId Worker machine Id.
+ * @param executorId Executor Id.
+ * @param resource Slots that are available.
+ */
case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource)
+ /** Some executor JVM is dead, try to recover tasks that are located on failed executor */
case class ExecutorFailed(executorId: Int)
- class JarSchedulerImpl(appId : Int, appName: String, config: Config) extends Actor with Stash {
+ class JarSchedulerImpl(appId: Int, appName: String, config: Config) extends Actor with Stash {
+ // Each TaskScheduler maps to a jar.
private var taskSchedulers = Map.empty[AppJar, TaskScheduler]
private val LOG = LogUtil.getLogger(getClass)
@@ -84,26 +112,29 @@ object JarScheduler{
def receive: Receive = waitForNewDag
def waitForNewDag: Receive = {
- case TransitToNewDag => // continue current state
+ case TransitToNewDag => // Continue current state
case NewDag(dag, startTime) =>
LOG.info(s"Init JarScheduler, dag version: ${dag.version}, startTime: $startTime")
val processors = dag.processors.values.groupBy(_.jar)
+
taskSchedulers = processors.map { jarAndProcessors =>
val (jar, processors) = jarAndProcessors
- //Construct the sub DAG
- val graph = Graph.empty[ProcessorDescription, PartitionerDescription]
- processors.foreach{processor =>
+ // Construct the sub DAG, each sub DAG maps to a separate jar.
+ val subGraph = Graph.empty[ProcessorDescription, PartitionerDescription]
+ processors.foreach { processor =>
if (startTime < processor.life.death) {
- graph.addVertex(processor)
+ subGraph.addVertex(processor)
}
}
- val subDag = DAG(graph)
- val taskScheduler = taskSchedulers.getOrElse(jar, new TaskSchedulerImpl(appId, appName, config))
+ val subDagForSingleJar = DAG(subGraph)
- LOG.info(s"Set DAG for TaskScheduler, count: " + subDag.processors.size)
- taskScheduler.setDAG(subDag)
+ val taskScheduler = taskSchedulers
+ .getOrElse(jar, new TaskSchedulerImpl(appId, appName, config))
+
+ LOG.info(s"Set DAG for TaskScheduler, count: " + subDagForSingleJar.processors.size)
+ taskScheduler.setDAG(subDagForSingleJar)
jar -> taskScheduler
}
unstashAll()
@@ -113,15 +144,20 @@ object JarScheduler{
}
def ready: Receive = {
+ // Notifies there is a new DAG coming.
case TransitToNewDag =>
context.become(waitForNewDag)
- case GetRequestDetails =>
+
+ case GetResourceRequestDetails =>
+
+ // Asks each TaskScheduler (Each for one jar) the resource requests.
val result: Array[ResourceRequestDetail] = taskSchedulers.map { jarAndScheduler =>
val (jar, scheduler) = jarAndScheduler
ResourceRequestDetail(jar, scheduler.getResourceRequests())
}.toArray
LOG.info(s"GetRequestDetails " + result.mkString(";"))
sender ! result
+
case ScheduleTask(appJar, workerId, executorId, resource) =>
val result: List[TaskId] = taskSchedulers.get(appJar).map { scheduler =>
scheduler.schedule(workerId, executorId, resource)
@@ -130,9 +166,9 @@ object JarScheduler{
sender ! result
case ExecutorFailed(executorId) =>
val result: Option[ResourceRequestDetail] = taskSchedulers.
- find(_._2.scheduledTasks(executorId).nonEmpty).map{ jarAndScheduler =>
- ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId))
- }
+ find(_._2.scheduledTasks(executorId).nonEmpty).map { jarAndScheduler =>
+ ResourceRequestDetail(jarAndScheduler._1, jarAndScheduler._2.executorFailed(executorId))
+ }
LOG.info(s"ExecutorFailed " + result.mkString(";"))
sender ! result
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
index cdcd3f6..3b2c8bf 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,16 @@
package io.gearpump.streaming.appmaster
-import io.gearpump.streaming.{ExecutorId, ProcessorId, LifeTime}
import io.gearpump._
import io.gearpump.cluster.AppMasterToMaster.AppMasterSummary
-import io.gearpump.cluster.{UserConfig, MasterToAppMaster}
import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus
-import AppMaster.ExecutorBrief
+import io.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import io.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
+import io.gearpump.streaming.{ExecutorId, LifeTime, ProcessorId}
import io.gearpump.util.Graph
import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+/** Stream application summary, used for REST API */
case class StreamAppMasterSummary(
appType: String = "streaming",
appId: Int,
@@ -42,7 +43,7 @@ case class StreamAppMasterSummary(
dag: Graph[ProcessorId, String] = null,
executors: List[ExecutorBrief] = null,
processors: Map[ProcessorId, ProcessorSummary] = Map.empty[ProcessorId, ProcessorSummary],
- // hiearachy level for each processor
+ // Hiearachy level for each processor
processorLevels: Map[ProcessorId, Int] = Map.empty[ProcessorId, Int],
historyMetricsConfig: HistoryMetricsConfig = null)
extends AppMasterSummary
@@ -50,11 +51,11 @@ case class StreamAppMasterSummary(
case class TaskCount(count: Int)
case class ProcessorSummary(
- id: ProcessorId,
- taskClass: String,
- parallelism : Int,
- description: String,
- taskConf: UserConfig,
- life: LifeTime,
- executors: List[ExecutorId],
- taskCount: Map[ExecutorId, TaskCount])
\ No newline at end of file
+ id: ProcessorId,
+ taskClass: String,
+ parallelism: Int,
+ description: String,
+ taskConf: UserConfig,
+ life: LifeTime,
+ executors: List[ExecutorId],
+ taskCount: Map[ExecutorId, TaskCount])
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
index c456a06..6e7ebc6 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,35 +17,38 @@
*/
package io.gearpump.streaming.appmaster
-import com.typesafe.config.{ConfigValueFactory, ConfigFactory, ConfigRenderOptions, Config}
-import TaskLocator.{Localities, WorkerLocality, NonLocality, Locality}
-import io.gearpump.WorkerId
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, ConfigValueFactory}
+
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.streaming.Constants
+import io.gearpump.streaming.appmaster.TaskLocator.{Localities, Locality, NonLocality, WorkerLocality}
import io.gearpump.streaming.task.TaskId
-import scala.util.Try
-import scala.collection.JavaConverters._
/**
* TaskLocator is used to decide which machine one task should run on.
*
- * User can specify config [[Constants.GEARPUMP_STREAMING_LOCALITIES]] to decide
- * to control which machine the task is running on.
+ * User can specify config [[io.gearpump.streaming.Constants#GEARPUMP_STREAMING_LOCALITIES]] to
+ * decide to control which machine the task is running on.
*/
class TaskLocator(appName: String, config: Config) {
private val taskLocalities: Map[TaskId, Locality] = loadTaskLocalities(config)
- def locateTask(taskId: TaskId) : Locality = {
+ /** Finds where a task should belongs to */
+ def locateTask(taskId: TaskId): Locality = {
taskLocalities.getOrElse(taskId, NonLocality)
}
- private def loadTaskLocalities(config: Config) : Map[TaskId, Locality] = {
- import Constants.GEARPUMP_STREAMING_LOCALITIES
- Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map {appConfig =>
+ private def loadTaskLocalities(config: Config): Map[TaskId, Locality] = {
+ import io.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES
+ Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map { appConfig =>
val json = appConfig.root().render(ConfigRenderOptions.concise)
Localities.fromJson(json)
}.map { localityConfig =>
import localityConfig.localities
- localities.keySet.flatMap {workerId =>
+ localities.keySet.flatMap { workerId =>
val tasks = localities(workerId)
tasks.map((_, WorkerLocality(workerId)))
}.toArray.toMap
@@ -57,15 +60,20 @@ object TaskLocator {
trait Locality
+ /** Means we require the resource from the specific worker */
case class WorkerLocality(workerId: WorkerId) extends Locality
+ /** Means no preference on worker */
object NonLocality extends Locality
+ /** Localities settings. Mapping from workerId to list of taskId */
case class Localities(localities: Map[WorkerId, Array[TaskId]])
object Localities {
val pattern = "task_([0-9]+)_([0-9]+)".r
+ // To avoid polluting the classpath, we do the JSON translation ourself instead of
+ // introducing JSON library dependencies directly.
def fromJson(json: String): Localities = {
val localities = ConfigFactory.parseString(json).getAnyRef("localities")
.asInstanceOf[java.util.Map[String, String]].asScala.map { pair =>
@@ -80,8 +88,9 @@ object TaskLocator {
}
def toJson(localities: Localities): String = {
- val map = localities.localities.toList.map {pair =>
- (WorkerId.render(pair._1), pair._2.map(task => s"task_${task.processorId}_${task.index}").mkString(","))
+ val map = localities.localities.toList.map { pair =>
+ (WorkerId.render(pair._1), pair._2.map(task =>
+ s"task_${task.processorId}_${task.index}").mkString(","))
}.toMap.asJava
ConfigFactory.empty().withValue("localities", ConfigValueFactory.fromAnyRef(map)).
root.render(ConfigRenderOptions.concise())
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
index e1f4872..51f2f9c 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
@@ -18,12 +18,17 @@
package io.gearpump.streaming.appmaster
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
import akka.actor._
import akka.pattern.ask
+import org.slf4j.Logger
+
import io.gearpump.TimeStamp
import io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
import io.gearpump.streaming.AppMasterToExecutor._
-import io.gearpump.streaming.ExecutorToAppMaster.{UnRegisterTask, MessageLoss, RegisterTask}
+import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask}
import io.gearpump.streaming._
import io.gearpump.streaming.appmaster.AppMaster.{AllocateResourceTimeOut, LookupTaskActorRef, TaskActorRef}
import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess}
@@ -36,31 +41,26 @@ import io.gearpump.streaming.executor.ExecutorRestartPolicy
import io.gearpump.streaming.task._
import io.gearpump.streaming.util.ActorPathUtil
import io.gearpump.util.{Constants, LogUtil}
-import org.slf4j.Logger
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
/**
*
* TaskManager track all tasks's status.
*
* It is state machine with three states:
- * 1. applicationReady
- * 2. recovery
- * 3. dynamicDag
+ * 1. applicationReady
+ * 2. recovery
+ * 3. dynamicDag
*
* When in state applicationReady:
- * 1. When there is message-loss or JVM crash, transit to state recovery.
- * 2. When user modify the DAG, transit to dynamicDag.
+ * 1. When there is message-loss or JVM crash, transit to state recovery.
+ * 2. When user modify the DAG, transit to dynamicDag.
*
* When in state recovery:
- * 1. When all tasks has been recovered, transit to applicationReady.
+ * 1. When all tasks has been recovered, transit to applicationReady.
*
* When in state dynamicDag:
- * 1. When dyanmic dag transition is complete, transit to applicationReady.
- * 2. When there is message loss or JVM crash, transit to state recovery.
- *
+ * 1. When dynamic dag transition is complete, transit to applicationReady.
+ * 2. When there is message loss or JVM crash, transit to state recovery.
*/
private[appmaster] class TaskManager(
appId: Int,
@@ -73,13 +73,16 @@ private[appmaster] class TaskManager(
extends Actor {
private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
- val systemConfig = context.system.settings.config
+ private val systemConfig = context.system.settings.config
private val ids = new SessionIdFactory()
- private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5, withinTimeRange = 20 seconds)
- implicit val timeout = Constants.FUTURE_TIMEOUT
- implicit val actorSystem = context.system
+ private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries = 5,
+ withinTimeRange = 20.seconds)
+
+ private implicit val timeout = Constants.FUTURE_TIMEOUT
+ private implicit val actorSystem = context.system
+
import context.dispatcher
dagManager ! WatchChange(watcher = self)
@@ -91,7 +94,6 @@ private[appmaster] class TaskManager(
private var startClock: Future[TimeStamp] = getStartClock
-
def receive: Receive = applicationReady(DagReadyState.empty)
private def onClientQuery(taskRegistry: TaskRegistry): Receive = {
@@ -104,14 +106,14 @@ private[appmaster] class TaskManager(
val requestor = sender
executorId.map { executorId =>
val taskPath = ActorPathUtil.taskActorPath(appMaster, executorId, taskId)
- context.actorSelection(taskPath).resolveOne(3 seconds).map { taskActorRef =>
+ context.actorSelection(taskPath).resolveOne(3.seconds).map { taskActorRef =>
requestor ! TaskActorRef(taskActorRef)
}
}
}
/**
- * state applicationReady
+ * State applicationReady
*/
def applicationReady(state: DagReadyState): Receive = {
executorManager ! state.taskRegistry.usedResource
@@ -121,7 +123,7 @@ private[appmaster] class TaskManager(
val recoverRegistry = new TaskRegistry(expectedTasks = state.dag.tasks,
deadTasks = state.taskRegistry.deadTasks)
-
+
val recoverState = new StartDagState(state.dag, recoverRegistry)
val onError: Receive = {
@@ -153,8 +155,8 @@ private[appmaster] class TaskManager(
val dagDiff = migrate(state.dag, newDag)
jarScheduler.setDag(newDag, startClock)
- val resourceRequestsDetails = jarScheduler.getRequestDetails()
- resourceRequestsDetails.map{ details =>
+ val resourceRequestsDetails = jarScheduler.getResourceRequestDetails()
+ resourceRequestsDetails.map { details =>
details.foreach { detail =>
if (detail.requests.length > 0 && detail.requests.exists(!_.resource.isEmpty)) {
executorManager ! StartExecutors(detail.requests, detail.jar)
@@ -168,7 +170,8 @@ private[appmaster] class TaskManager(
executors.foreach { pair =>
val (executorId, tasks) = pair
modifiedTasks ++= tasks
- dagManager ! GetTaskLaunchData(newDag.version, processorId, ChangeTasksOnExecutor(executorId, tasks))
+ dagManager ! GetTaskLaunchData(newDag.version, processorId,
+ ChangeTasksOnExecutor(executorId, tasks))
}
}
@@ -200,12 +203,13 @@ private[appmaster] class TaskManager(
context.become(applicationReady(newState))
}
- // recover to same version
- onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse onUnRegisterTask orElse unHandled("applicationReady")
+ // Recovers to same version
+ onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse
+ onUnRegisterTask orElse unHandled("applicationReady")
}
/**
- * state dynamicDag
+ * State dynamicDag
*/
def dynamicDag(state: StartDagState, recoverState: StartDagState): Receive = {
LOG.info(s"DynamicDag transit to dag version: ${state.dag.version}...")
@@ -231,11 +235,13 @@ private[appmaster] class TaskManager(
case executor: ExecutorStarted =>
import executor.{boundedJar, executorId, resource, workerId}
val taskIdsFuture = jarScheduler.scheduleTask(boundedJar.get, workerId, executorId, resource)
- taskIdsFuture.foreach {taskIds =>
- LOG.info(s"Executor $executor has been started, start to schedule tasks: ${taskIds.mkString(",")}")
+ taskIdsFuture.foreach { taskIds =>
+ LOG.info(s"Executor $executor has been started, " +
+ s"start to schedule tasks: ${taskIds.mkString(",")}")
taskIds.groupBy(_.processorId).foreach { pair =>
val (processorId, tasks) = pair
- dagManager ! GetTaskLaunchData(state.dag.version, processorId, StartTasksOnExecutor(executor.executorId, tasks))
+ dagManager ! GetTaskLaunchData(state.dag.version, processorId,
+ StartTasksOnExecutor(executor.executorId, tasks))
}
}
@@ -250,7 +256,8 @@ private[appmaster] class TaskManager(
tasks.foreach(executorRestartPolicy.addTaskToExecutor(executorId, _))
case ChangeTasksOnExecutor(executorId, tasks) =>
LOG.info("change Task on executor: " + executorId + ", tasks: " + tasks)
- val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life, subscribers)
+ val changeTasks = ChangeTasks(tasks, state.dag.version, processorDescription.life,
+ subscribers)
executorManager ! UniCast(executorId, changeTasks)
case other =>
LOG.error(s"severe error! we expect ExecutorStarted but get ${other.getClass.toString}")
@@ -258,7 +265,7 @@ private[appmaster] class TaskManager(
case TasksLaunched =>
// We will track all launched task by message RegisterTask
case TasksChanged(tasks) =>
- tasks.foreach(task =>state.taskChangeRegistry.taskChanged(task))
+ tasks.foreach(task => state.taskChangeRegistry.taskChanged(task))
if (allTasksReady(state)) {
broadcastLocations(state)
@@ -306,10 +313,11 @@ private[appmaster] class TaskManager(
def onExecutorError: Receive = {
case ExecutorStopped(executorId) =>
- if(executorRestartPolicy.allowRestartExecutor(executorId)) {
+ if (executorRestartPolicy.allowRestartExecutor(executorId)) {
jarScheduler.executorFailed(executorId).foreach { resourceRequestDetail =>
if (resourceRequestDetail.isDefined) {
- executorManager ! StartExecutors(resourceRequestDetail.get.requests, resourceRequestDetail.get.jar)
+ executorManager ! StartExecutors(resourceRequestDetail.get.requests,
+ resourceRequestDetail.get.jar)
}
}
} else {
@@ -330,7 +338,7 @@ private[appmaster] class TaskManager(
}
/**
- * state recovery
+ * State recovery
*/
def recovery(state: StartDagState): Receive = {
val recoverDagVersion = state.dag.version
@@ -344,7 +352,7 @@ private[appmaster] class TaskManager(
LOG.info(s"goto state Recovery(recoverDag = $recoverDagVersion)...")
val ignoreClock: Receive = {
case clock: ClockEvent =>
- //ignore clock events.
+ // Ignores clock events.
}
if (state.dag.isEmpty) {
@@ -354,7 +362,8 @@ private[appmaster] class TaskManager(
deadTasks = state.taskRegistry.deadTasks)
val recoverState = new StartDagState(state.dag, registry)
- ignoreClock orElse startDag(state, recoverState) orElse onExecutorError orElse unHandled("recovery")
+ ignoreClock orElse startDag(state, recoverState) orElse onExecutorError orElse
+ unHandled("recovery")
}
}
@@ -364,19 +373,17 @@ private[appmaster] class TaskManager(
}
}
-private [appmaster] object TaskManager {
+private[appmaster] object TaskManager {
/**
* When application is ready, then transit to DagReadyState
*/
- class DagReadyState(
- val dag: DAG,
- val taskRegistry: TaskRegistry)
+ class DagReadyState(val dag: DAG, val taskRegistry: TaskRegistry)
object DagReadyState {
def empty: DagReadyState = {
new DagReadyState(
- DAG.empty().copy(version = -1),
+ DAG.empty.copy(version = -1),
new TaskRegistry(List.empty[TaskId]))
}
}
@@ -385,10 +392,10 @@ private [appmaster] object TaskManager {
* When application is booting up or doing recovery, it use StartDagState
*/
class StartDagState(
- val dag: DAG,
- val taskRegistry: TaskRegistry,
- val taskChangeRegistry: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]),
- val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry)
+ val dag: DAG,
+ val taskRegistry: TaskRegistry,
+ val taskChangeRegistry: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId]),
+ val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry)
case object GetTaskList
@@ -397,18 +404,17 @@ private [appmaster] object TaskManager {
case class FailedToRecover(errorMsg: String)
/**
- * Start new Tasks on Executor <executorId>
+ * Starts new Tasks on Executor executorId
*/
case class StartTasksOnExecutor(executorId: Int, tasks: List[TaskId])
/**
- * Change existing tasks on executor <executorId>
+ * Changes existing tasks on executor executorId
*/
case class ChangeTasksOnExecutor(executorId: Int, tasks: List[TaskId])
-
/**
- * Track the registration of all new started executors.
+ * Tracks the registration of all new started executors.
*/
class ExecutorRegistry {
private var registeredExecutors = Set.empty[ExecutorId]
@@ -423,7 +429,7 @@ private [appmaster] object TaskManager {
}
/**
- * Track the registration of all changed tasks.
+ * Tracks the registration of all changed tasks.
*/
class TaskChangeRegistry(targetTasks: List[TaskId]) {
private var registeredTasks = Set.empty[TaskId]
@@ -443,12 +449,12 @@ private [appmaster] object TaskManager {
* DAGDiff is used to track impacted processors when doing dynamic dag.
*/
case class DAGDiff(
- addedProcessors: List[ProcessorId],
- modifiedProcessors: List[ProcessorId],
- impactedUpstream: List[ProcessorId])
+ addedProcessors: List[ProcessorId],
+ modifiedProcessors: List[ProcessorId],
+ impactedUpstream: List[ProcessorId])
/**
- * Migrate from old DAG to new DAG, return DAGDiff
+ * Migrates from old DAG to new DAG, return DAGDiff
*/
def migrate(leftDAG: DAG, rightDAG: DAG): DAGDiff = {
val left = leftDAG.processors.keySet
@@ -457,19 +463,19 @@ private [appmaster] object TaskManager {
val added = right -- left
val join = right -- added
- val modified = join.filter {processorId =>
+ val modified = join.filter { processorId =>
leftDAG.processors(processorId) != rightDAG.processors(processorId)
}
val upstream = (list: Set[ProcessorId]) => {
- list.flatMap {processorId =>
+ list.flatMap { processorId =>
rightDAG.graph.incomingEdgesOf(processorId).map(_._1).toSet
} -- list
}
val impactedUpstream = upstream(added ++ modified)
- // all upstream will be affected.
+ // All upstream tasks are affected, and should be handled properly.
DAGDiff(added.toList, modified.toList, impactedUpstream.toList)
}
@@ -480,7 +486,7 @@ private [appmaster] object TaskManager {
private var nextSessionId = 1
/**
- * return a new session Id for new task
+ * Returns a new session Id for new task
*/
final def newSessionId: Int = {
val sessionId = nextSessionId
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
index 79371ea..adfdeba 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
@@ -18,18 +18,18 @@
package io.gearpump.streaming.appmaster
-import io.gearpump.streaming.{ExecutorId, ProcessorId}
-import io.gearpump.streaming.task.TaskId
+import org.slf4j.Logger
+
import io.gearpump.cluster.scheduler.Resource
-import ExecutorManager.ExecutorResourceUsageSummary
-import TaskRegistry._
+import io.gearpump.streaming.appmaster.ExecutorManager.ExecutorResourceUsageSummary
+import io.gearpump.streaming.appmaster.TaskRegistry._
+import io.gearpump.streaming.task.TaskId
+import io.gearpump.streaming.{ExecutorId, ProcessorId}
import io.gearpump.transport.HostPort
import io.gearpump.util.LogUtil
-import org.slf4j.Logger
/**
- * TaskRegistry is used to track the registration of all tasks
- * when one application is booting up.
+ * Tracks the registration of all tasks, when the application is booting up.
*/
class TaskRegistry(val expectedTasks: List[TaskId],
var registeredTasks: Map[TaskId, TaskLocation] = Map.empty[TaskId, TaskLocation],
@@ -43,6 +43,10 @@ class TaskRegistry(val expectedTasks: List[TaskId],
* When a task is booted, it need to call registerTask to register itself.
* If this task is valid, then accept it, otherwise reject it.
*
+ * @param taskId Task that register itself to TaskRegistry.
+ * @param location The host and port where this task is running on. NOTE: The host and port
+ * is NOT the same host and port of Akka remoting. Instead, it is host and port
+ * of custom netty layer, see [[io.gearpump.transport.netty.Context]].
*/
def registerTask(taskId: TaskId, location: TaskLocation): RegisterTaskStatus = {
val processorId = taskId.processorId
@@ -57,13 +61,13 @@ class TaskRegistry(val expectedTasks: List[TaskId],
}
def copy(expectedTasks: List[TaskId] = this.expectedTasks,
- registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks,
- deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = {
+ registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks,
+ deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = {
new TaskRegistry(expectedTasks, registeredTasks, deadTasks)
}
def getTaskLocations: TaskLocations = {
- val taskLocations = registeredTasks.toList.groupBy(_._2.host).map{ pair =>
+ val taskLocations = registeredTasks.toList.groupBy(_._2.host).map { pair =>
val (k, v) = pair
val taskIds = v.map(_._1)
(k, taskIds.toSet)
@@ -74,16 +78,18 @@ class TaskRegistry(val expectedTasks: List[TaskId],
def getTaskExecutorMap: Map[TaskId, ExecutorId] = {
getTaskLocations.locations.flatMap { pair =>
val (hostPort, taskSet) = pair
- taskSet.map{ taskId =>
+ taskSet.map { taskId =>
(taskId, getExecutorId(taskId).getOrElse(-1))
}
}
}
+ /** Query the executor Id where the task is running on */
def getExecutorId(taskId: TaskId): Option[Int] = {
registeredTasks.get(taskId).map(_.executorId)
}
+ /** Gets list of allocated executor Ids */
def executors: List[ExecutorId] = {
registeredTasks.toList.map(_._2.executorId)
}
@@ -101,6 +107,7 @@ class TaskRegistry(val expectedTasks: List[TaskId],
registeredTasks.keys.toList.filter(_.processorId == processorId)
}
+ /** List of executors that current processor taks are running on */
def processorExecutors(processorId: ProcessorId): Map[ExecutorId, List[TaskId]] = {
val taskToExecutor = filterTasks(processorId).flatMap { taskId =>
getExecutorId(taskId).map { executorId =>
@@ -108,15 +115,16 @@ class TaskRegistry(val expectedTasks: List[TaskId],
}
}
- val executorToTasks = taskToExecutor.groupBy(_._2).map{kv =>
+ val executorToTasks = taskToExecutor.groupBy(_._2).map { kv =>
val (k, v) = kv
(k, v.map(_._1))
}
executorToTasks
}
+ /** Summary about how many resources are used for all running tasks */
def usedResource: ExecutorResourceUsageSummary = {
- val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) {(map, task) =>
+ val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, Resource]) { (map, task) =>
val resource = map.getOrElse(task._2.executorId, Resource(0)) + Resource(1)
map + (task._2.executorId -> resource)
}
@@ -131,5 +139,5 @@ object TaskRegistry {
case class TaskLocation(executorId: Int, host: HostPort)
- case class TaskLocations(locations : Map[HostPort, Set[TaskId]])
+ case class TaskLocations(locations: Map[HostPort, Set[TaskId]])
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
index 39f427a..62dff6c 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,19 @@
package io.gearpump.streaming.appmaster
import com.typesafe.config.Config
-import io.gearpump.{WorkerId, TimeStamp}
+
import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
import io.gearpump.streaming.DAG
import io.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality}
import io.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus}
import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.{Constants, LogUtil}
-import org.slf4j.Logger
+import io.gearpump.util.Constants
/**
- * This schedules tasks to run for new allocated resources.
+ * Schedules tasks to run for new allocated resources. TaskScheduler only schedule tasks that
+ * share the same jar. For scheduling for multiple jars, see
+ * [[io.gearpump.streaming.appmaster.JarScheduler]].
*/
trait TaskScheduler {
@@ -44,8 +46,8 @@ trait TaskScheduler {
def getResourceRequests(): Array[ResourceRequest]
/**
- * This notify the scheduler that a resource slot on {workerId} and {executorId} is allocated, and
- * expect a task to be scheduled in return.
+ * This notifies the scheduler that a resource slot on {workerId} and {executorId} is allocated
+ * , and expect a task to be scheduled in return.
* Task locality should be considered when deciding whether to offer a task on target {worker}
* and {executor}
*
@@ -53,24 +55,24 @@ trait TaskScheduler {
* @param executorId which executorId this resource belongs to.
* @return a list of tasks
*/
- def schedule(workerId : WorkerId, executorId: Int, resource: Resource) : List[TaskId]
+ def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId]
/**
- * This notify the scheduler that {executorId} is failed, and expect a set of
+ * This notifies the scheduler that {executorId} is failed, and expect a set of
* ResourceRequest for all failed tasks on that executor.
*
* @param executorId executor that failed
* @return resource requests of the failed executor
*/
- def executorFailed(executorId: Int) : Array[ResourceRequest]
+ def executorFailed(executorId: Int): Array[ResourceRequest]
/**
- * Query the task list that already scheduled on the executor
- *
- * @param executorId executor to query
- * @return a list of tasks
- */
- def scheduledTasks(executorId: Int) : List[TaskId]
+ * Queries the task list that already scheduled on the executor
+ *
+ * @param executorId executor to query
+ * @return a list of tasks
+ */
+ def scheduledTasks(executorId: Int): List[TaskId]
}
object TaskScheduler {
@@ -79,12 +81,12 @@ object TaskScheduler {
class TaskStatus(val taskId: TaskId, val preferLocality: Locality, var allocation: Location)
}
-class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends TaskScheduler {
+class TaskSchedulerImpl(appId: Int, appName: String, config: Config) extends TaskScheduler {
private val executorNum = config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER)
private var tasks = List.empty[TaskStatus]
- // find the locality of the tasks
+ // Finds the locality of the tasks
private val taskLocator = new TaskLocator(appName, config)
override def setDAG(dag: DAG): Unit = {
@@ -96,15 +98,15 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T
}
}
- def getResourceRequests(): Array[ResourceRequest] ={
+ def getResourceRequests(): Array[ResourceRequest] = {
fetchResourceRequests(fromOneWorker = false)
}
- import Relaxation._
- private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] ={
+ import io.gearpump.cluster.scheduler.Relaxation._
+ private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] = {
var workersResourceRequest = Map.empty[WorkerId, Resource]
- tasks.filter(_.allocation == null).foreach{task =>
+ tasks.filter(_.allocation == null).foreach { task =>
task.preferLocality match {
case WorkerLocality(workerId) =>
val current = workersResourceRequest.getOrElse(workerId, Resource.empty)
@@ -116,7 +118,7 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T
}
}
- workersResourceRequest.map {workerIdAndResource =>
+ workersResourceRequest.map { workerIdAndResource =>
val (workerId, resource) = workerIdAndResource
if (workerId == WorkerId.unspecified) {
ResourceRequest(resource, workerId = WorkerId.unspecified, executorNum = executorNum)
@@ -126,23 +128,26 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T
}.toArray
}
- override def schedule(workerId : WorkerId, executorId: Int, resource: Resource) : List[TaskId] = {
+ override def schedule(workerId: WorkerId, executorId: Int, resource: Resource): List[TaskId] = {
var scheduledTasks = List.empty[TaskId]
val location = Location(workerId, executorId)
- // schedule tasks for specific worker
+ // Schedules tasks for specific worker
scheduledTasks ++= scheduleTasksForLocality(resource, location,
(locality) => locality == WorkerLocality(workerId))
- // schedule tasks without specific location preference
- scheduledTasks ++= scheduleTasksForLocality(resource - Resource(scheduledTasks.length), location, (locality) => true)
+ // Schedules tasks without specific location preference
+ scheduledTasks ++= scheduleTasksForLocality(resource - Resource(scheduledTasks.length),
+ location, (locality) => true)
scheduledTasks
}
- private def scheduleTasksForLocality(resource: Resource, resourceLocation: Location, matcher: (Locality) => Boolean): List[TaskId] = {
+ private def scheduleTasksForLocality(
+ resource: Resource, resourceLocation: Location, matcher: (Locality) => Boolean)
+ : List[TaskId] = {
var scheduledTasks = List.empty[TaskId]
var index = 0
var remain = resource.slots
- while(index < tasks.length && remain > 0) {
+ while (index < tasks.length && remain > 0) {
val taskStatus = tasks(index)
if (taskStatus.allocation == null && matcher(taskStatus.preferLocality)) {
taskStatus.allocation = resourceLocation
@@ -154,18 +159,19 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T
scheduledTasks
}
- override def executorFailed(executorId: Int) : Array[ResourceRequest] = {
+ override def executorFailed(executorId: Int): Array[ResourceRequest] = {
val failedTasks = tasks.filter { status =>
status.allocation != null && status.allocation.executorId == executorId
}
- // clean the location of failed tasks
+ // Cleans the location of failed tasks
failedTasks.foreach(_.allocation = null)
- Array(ResourceRequest(Resource(failedTasks.length), workerId = WorkerId.unspecified, relaxation = ONEWORKER))
+ Array(ResourceRequest(Resource(failedTasks.length),
+ workerId = WorkerId.unspecified, relaxation = ONEWORKER))
}
override def scheduledTasks(executorId: Int): List[TaskId] = {
- tasks.filter{ status =>
+ tasks.filter { status =>
status.allocation != null && status.allocation.executorId == executorId
}.map(_.taskId)
}