You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/03/10 15:06:49 UTC
incubator-gearpump git commit: [GEARPUMP-291] Refactor TaskActor
Repository: incubator-gearpump
Updated Branches:
refs/heads/master ae55efbdd -> 0bc65218f
[GEARPUMP-291] Refactor TaskActor
Author: manuzhang <ow...@gmail.com>
Closes #170 from manuzhang/refactor_task.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/0bc65218
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/0bc65218
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/0bc65218
Branch: refs/heads/master
Commit: 0bc65218f39962cf237d4b56b95c06ce2536ea1c
Parents: ae55efb
Author: manuzhang <ow...@gmail.com>
Authored: Fri Mar 10 23:02:18 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Mar 10 23:06:28 2017 +0800
----------------------------------------------------------------------
.../streaming/appmaster/ClockService.scala | 14 ++-
.../gearpump/streaming/task/TaskActor.scala | 113 +++++++++++--------
.../streaming/task/TaskControlMessage.scala | 4 +-
.../streaming/appmaster/AppMasterSpec.scala | 7 +-
.../streaming/appmaster/ClockServiceSpec.scala | 5 +-
5 files changed, 88 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index b1f0b23..77a966a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -37,7 +37,6 @@ import org.apache.gearpump.streaming.task._
import org.apache.gearpump.util.LogUtil
import org.slf4j.Logger
-import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.language.implicitConversions
@@ -194,7 +193,7 @@ class ClockService(
private def getUpstreamClocks(
clocks: Map[ProcessorId, ProcessorClock]): Map[ProcessorId, Array[ProcessorClock]] = {
clocks.foldLeft(Map.empty[ProcessorId, Array[ProcessorClock]]) {
- case (accum, (processorId, clock)) =>
+ case (accum, (processorId, _)) =>
val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
if (upstreams.nonEmpty) {
val upstreamClocks = upstreams.collect(clocks)
@@ -215,7 +214,7 @@ class ClockService(
def clockService: Receive = {
case GetUpstreamMinClock(task) =>
- getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_))
+ sendBackUpstreamMinClock(sender, task)
case UpdateClock(task, clock) =>
val processorClock = clocks.get(task.processorId)
@@ -225,10 +224,9 @@ class ClockService(
LOG.error(s"Cannot updateClock for task $task")
}
if (Instant.ofEpochMilli(minClock).equals(Watermark.MAX)) {
- healthCheckScheduler.cancel()
appMaster ! EndingClock
} else {
- getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_))
+ sendBackUpstreamMinClock(sender, task)
}
case GetLatestMinClock =>
@@ -263,7 +261,7 @@ class ClockService(
case GetCheckpointClock =>
sender ! CheckpointClock(minCheckpointClock)
- case getStalling: GetStallingTasks =>
+ case GetStallingTasks =>
sender ! StallingTasks(healthChecker.getReport.stallingTasks)
case ChangeToNewDAG(newDag) =>
@@ -282,6 +280,10 @@ class ClockService(
})
}
+ private def sendBackUpstreamMinClock(sender: ActorRef, task: TaskId): Unit = {
+ sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId))
+ }
+
private def removeProcessor(processorId: ProcessorId): Unit = {
clocks = clocks - processorId
processorClocks = processorClocks.filter(_.processorId != processorId)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 318ebf8..10648b4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -30,14 +30,19 @@ import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.metrics.Metrics
import org.apache.gearpump.serializer.SerializationFramework
import org.apache.gearpump.streaming.AppMasterToExecutor._
+import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.ExecutorToAppMaster._
import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.task.TaskActor._
import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp}
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+
/**
- *
- * All tasks of Gearpump runs inside a Actor. TaskActor is the Actor container for a task.
+ * All tasks of Gearpump run inside an Actor. TaskActor is the Actor container for a task.
*/
class TaskActor(
val taskId: TaskId,
@@ -48,20 +53,17 @@ class TaskActor(
extends Actor with ExpressTransport with TimeOutScheduler {
private var upstreamMinClock: TimeStamp = MIN_TIME_MILLIS
private var _minClock: TimeStamp = MIN_TIME_MILLIS
- private var minClockReported: Boolean = true
def serializerPool: SerializationFramework = inputSerializerPool
- import taskContextData._
-
- import org.apache.gearpump.streaming.Constants._
- import org.apache.gearpump.streaming.task.TaskActor._
- val config = context.system.settings.config
+ private val config = context.system.settings.config
- val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
+ val LOG: Logger = LogUtil.getLogger(getClass,
+ app = taskContextData.appId, executor = taskContextData.executorId, task = taskId)
// Metrics
- private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}"
+ private val metricName =
+ s"app${taskContextData.appId}.processor${taskId.processorId}.task${taskId.index}"
private val receiveLatency = Metrics(context.system).histogram(
s"$metricName:receiveLatency", sampleRate = 1)
private val processTime = Metrics(context.system).histogram(s"$metricName:processTime")
@@ -76,7 +78,6 @@ class TaskActor(
private var life = taskContextData.life
// Latency probe
- import scala.concurrent.duration._
import context.dispatcher
final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
@@ -137,7 +138,7 @@ class TaskActor(
}
final override def preStart(): Unit = {
- val register = RegisterTask(taskId, executorId, local)
+ val register = RegisterTask(taskId, taskContextData.executorId, local)
LOG.info(s"$register")
executor ! register
context.become(waitForTaskRegistered)
@@ -175,15 +176,14 @@ class TaskActor(
private def onStartClock(): Unit = {
LOG.info(s"received start, clock: $upstreamMinClock, sessionId: $sessionId")
- subscriptions = subscribers.map { subscriber =>
+ subscriptions = taskContextData.subscribers.map { subscriber =>
(subscriber.processorId,
- new Subscription(appId, executorId, taskId, subscriber, sessionId, this,
- maxPendingMessageCount, ackOnceEveryMessageCount))
+ new Subscription(taskContextData.appId, taskContextData.executorId, taskId, subscriber,
+ sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount))
}.sortBy(_._1)
subscriptions.foreach(_._2.start())
- import scala.collection.JavaConverters._
stashQueue.asScala.foreach { item =>
handleMessages(item.sender).apply(item.msg)
}
@@ -194,13 +194,13 @@ class TaskActor(
// target
onStart(Instant.ofEpochMilli(_minClock))
- appMaster ! GetUpstreamMinClock(taskId)
+ taskContextData.appMaster ! GetUpstreamMinClock(taskId)
context.become(handleMessages(sender))
}
def waitForTaskRegistered: Receive = {
- case start@TaskRegistered(_, sessionId, startClock) =>
- this.sessionId = sessionId
+ case TaskRegistered(_, id, startClock) =>
+ this.sessionId = id
this._minClock = startClock
context.become(waitForStartClock)
}
@@ -208,7 +208,8 @@ class TaskActor(
private val stashQueue = new util.LinkedList[MessageAndSender]()
def waitForStartClock: Receive = {
- case start: StartTask =>
+ case start@StartTask(tid) =>
+ assert(tid == this.taskId, s"$start sent to the wrong task ${this.taskId}")
onStartClock()
case other: AnyRef =>
stashQueue.add(MessageAndSender(other, sender()))
@@ -221,6 +222,7 @@ class TaskActor(
queue.add(SendAck(ackResponse, ackRequest.taskId))
doHandleMessage()
}
+
case ackRequest: AckRequest =>
// Enqueue to handle the ackRequest and send back ack later
val ackResponse = securityChecker.generateAckResponse(ackRequest, sender,
@@ -229,36 +231,45 @@ class TaskActor(
queue.add(SendAck(ackResponse, ackRequest.taskId))
doHandleMessage()
}
+
case ack: Ack =>
subscriptions.find(_._1 == ack.taskId.processorId).foreach(_._2.receiveAck(ack))
doHandleMessage()
+
case inputMessage: SerializedMessage =>
val message = Message(serializerPool.get().deserialize(inputMessage.bytes),
inputMessage.timeStamp)
receiveMessage(message, sender)
+
case inputMessage: Message =>
receiveMessage(inputMessage, sender)
- case watermark@Watermark(instant) =>
- if (self.eq(sender) && minClockReported) {
- updateUpstreamMinClock(instant.toEpochMilli)
- minClockReported = false
- }
+ case watermark@Watermark(instant) =>
+ assert(sender.eq(self), "Watermark should only be sent from Task to itself")
+ onUpstreamMinClock(instant.toEpochMilli)
receiveMessage(watermark.toMessage, sender)
case UpstreamMinClock(upstreamClock) =>
- updateUpstreamMinClock(upstreamClock)
-
- case ChangeTask(_, dagVersion, life, subscribers) =>
- this.life = life
+ // 1. received from ClockService and report minClock back after CLOCK_REPORT_INTERVAL
+ // then ClockService will send another update and loop
+ // 2. The loop is kicked off by GetUpstreamMinClock on start
+ // 3. upstreamClock is None for source task since it's reported as watermark above
+ // by external source
+ // 4. this is designed to avoid flooding the ClockService
+ upstreamClock.foreach(onUpstreamMinClock)
+ reportMinClock()
+
+ case ChangeTask(_, dagVersion, newLife, subscribers) =>
+ this.life = newLife
subscribers.foreach { subscriber =>
val processorId = subscriber.processorId
val subscription = getSubscription(processorId)
subscription match {
- case Some(subscription) =>
- subscription.changeLife(subscriber.lifeTime cross this.life)
+ case Some(subs) =>
+ subs.changeLife(subscriber.lifeTime cross this.life)
case None =>
- val subscription = new Subscription(appId, executorId, taskId, subscriber,
+ val subscription = new Subscription(taskContextData.appId,
+ taskContextData.executorId, taskId, subscriber,
sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount)
subscription.start()
subscriptions :+=(subscriber.processorId, subscription)
@@ -267,11 +278,13 @@ class TaskActor(
}
}
sender ! TaskChanged(taskId, dagVersion)
+
case LatencyProbe(timeStamp) =>
receiveLatency.update(System.currentTimeMillis() - timeStamp)
- case send: SendMessageLoss =>
- LOG.info("received SendMessageLoss")
+
+ case SendMessageLoss =>
throw new MsgLostException
+
case other: AnyRef =>
queue.add(other)
doHandleMessage()
@@ -310,7 +323,19 @@ class TaskActor(
subscriptions.find(_._1 == processorId).map(_._2)
}
- private def updateUpstreamMinClock(upstreamClock: TimeStamp): Unit = {
+ /**
+ * On receiving upstream min clock, this task will
+ *
+ * 1. update its upstreamMinClock and trigger watermark progress method
+ * if the new value is larger
+ * 2. update its own min clock
+ * 3. check for its own lifetime
+ *
+ * @param upstreamClock for DataSourceTask the clock comes from itself by DataSource.getWatermark
+ * for other tasks, the clock comes from that reported to ClockService
+ * by upstream tasks
+ */
+ private def onUpstreamMinClock(upstreamClock: TimeStamp): Unit = {
if (upstreamClock > this.upstreamMinClock) {
this.upstreamMinClock = upstreamClock
task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock))
@@ -328,22 +353,22 @@ class TaskActor(
_minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock))
- val update = UpdateClock(taskId, _minClock)
- context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
- appMaster ! update
- minClockReported = true
- }
-
-
// Checks whether current task is dead.
if (_minClock > life.death) {
// There will be no more message received...
- val unRegister = UnRegisterTask(taskId, executorId)
+ val unRegister = UnRegisterTask(taskId, taskContextData.executorId)
executor ! unRegister
LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life")
}
}
+
+ private def reportMinClock(): Unit = {
+ val update = UpdateClock(taskId, _minClock)
+ context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
+ taskContextData.appMaster ! update
+ }
+ }
}
object TaskActor {
@@ -412,7 +437,7 @@ object TaskActor {
case object FLUSH
- val NONE_SESSION = -1
+ val NONE_SESSION: Int = -1
case class MessageAndSender(msg: AnyRef, sender: ActorRef)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
index 73cd5af..c2e2faa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -54,7 +54,7 @@ case object GetCheckpointClock extends ClockEvent
case class CheckpointClock(clock: Option[TimeStamp])
-case class UpstreamMinClock(latestMinClock: TimeStamp)
+case class UpstreamMinClock(latestMinClock: Option[TimeStamp])
case class LatestMinClock(clock: TimeStamp)
@@ -67,7 +67,7 @@ case object EndingClock
/** Probe the latency between two upstream to downstream tasks. */
case class LatencyProbe(timestamp: Long)
-case class SendMessageLoss()
+case object SendMessageLoss
case object GetDAG
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index c1791aa..4faa058 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -154,6 +154,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
// clock status: task(0,0) -> 1, task(0,1)->0, task(1,0)->0, task(1,1)->0
appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
+ mockTask.expectMsg(UpstreamMinClock(None))
// check min clock
appMaster.tell(GetLatestMinClock, mockTask.ref)
@@ -161,6 +162,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
// clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0
appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
+ mockTask.expectMsg(UpstreamMinClock(None))
// check min clock
appMaster.tell(GetLatestMinClock, mockTask.ref)
@@ -170,7 +172,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
// Min clock of processor 0 (Task(0, 0) and Task(0, 1))
- mockTask.expectMsg(UpstreamMinClock(1))
+ mockTask.expectMsg(UpstreamMinClock(Some(1)))
// check min clock
appMaster.tell(GetLatestMinClock, mockTask.ref)
@@ -180,7 +182,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
// min clock of processor 0 (Task(0, 0) and Task(0, 1))
- mockTask.expectMsg(UpstreamMinClock(1))
+ mockTask.expectMsg(UpstreamMinClock(Some(1)))
// check min clock
appMaster.tell(GetLatestMinClock, mockTask.ref)
@@ -228,6 +230,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
for (i <- 1 to 5) {
val taskId = TaskId(0, 0)
appMaster.tell(UpdateClock(taskId, i), mockTask.ref)
+ mockTask.expectMsgType[UpstreamMinClock]
val cause = s"message loss $i from $taskId"
appMaster.tell(MessageLoss(0, taskId, cause), mockTask.ref)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
index 4b824e0..ae78980 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -53,11 +53,13 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
val startClock = 100L
store.put(ClockService.START_CLOCK, startClock)
val clockService = system.actorOf(Props(new ClockService(dag, appMaster, store)))
+
clockService ! GetLatestMinClock
expectMsg(LatestMinClock(startClock))
// task(0,0): clock(101); task(1,0): clock(100)
clockService ! UpdateClock(TaskId(0, 0), 101)
+ expectMsg(UpstreamMinClock(None))
// Min clock is updated
clockService ! GetLatestMinClock
@@ -67,7 +69,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
clockService ! UpdateClock(TaskId(1, 0), 101)
// Upstream is Task(0, 0), 101
- expectMsg(UpstreamMinClock(101))
+ expectMsg(UpstreamMinClock(Some(101)))
// Min clock is updated
clockService ! GetLatestMinClock
@@ -121,6 +123,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
clockService ! UpdateClock(TaskId(0, 0), 200L)
clockService ! UpdateClock(TaskId(1, 0), 200L)
expectMsgType[UpstreamMinClock]
+ expectMsgType[UpstreamMinClock]
clockService ! GetStartClock
expectMsg(StartClock(200L))