You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/03/10 15:06:49 UTC

incubator-gearpump git commit: [GEARPUMP-291] Refactor TaskActor

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master ae55efbdd -> 0bc65218f


[GEARPUMP-291] Refactor TaskActor

Author: manuzhang <ow...@gmail.com>

Closes #170 from manuzhang/refactor_task.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/0bc65218
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/0bc65218
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/0bc65218

Branch: refs/heads/master
Commit: 0bc65218f39962cf237d4b56b95c06ce2536ea1c
Parents: ae55efb
Author: manuzhang <ow...@gmail.com>
Authored: Fri Mar 10 23:02:18 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Mar 10 23:06:28 2017 +0800

----------------------------------------------------------------------
 .../streaming/appmaster/ClockService.scala      |  14 ++-
 .../gearpump/streaming/task/TaskActor.scala     | 113 +++++++++++--------
 .../streaming/task/TaskControlMessage.scala     |   4 +-
 .../streaming/appmaster/AppMasterSpec.scala     |   7 +-
 .../streaming/appmaster/ClockServiceSpec.scala  |   5 +-
 5 files changed, 88 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index b1f0b23..77a966a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -37,7 +37,6 @@ import org.apache.gearpump.streaming.task._
 import org.apache.gearpump.util.LogUtil
 import org.slf4j.Logger
 
-import scala.concurrent.Future
 import scala.concurrent.duration.FiniteDuration
 import scala.language.implicitConversions
 
@@ -194,7 +193,7 @@ class ClockService(
   private def getUpstreamClocks(
       clocks: Map[ProcessorId, ProcessorClock]): Map[ProcessorId, Array[ProcessorClock]] = {
     clocks.foldLeft(Map.empty[ProcessorId, Array[ProcessorClock]]) {
-      case (accum, (processorId, clock)) =>
+      case (accum, (processorId, _)) =>
         val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
         if (upstreams.nonEmpty) {
           val upstreamClocks = upstreams.collect(clocks)
@@ -215,7 +214,7 @@ class ClockService(
 
   def clockService: Receive = {
     case GetUpstreamMinClock(task) =>
-      getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_))
+      sendBackUpstreamMinClock(sender, task)
 
     case UpdateClock(task, clock) =>
       val processorClock = clocks.get(task.processorId)
@@ -225,10 +224,9 @@ class ClockService(
         LOG.error(s"Cannot updateClock for task $task")
       }
       if (Instant.ofEpochMilli(minClock).equals(Watermark.MAX)) {
-        healthCheckScheduler.cancel()
         appMaster ! EndingClock
       } else {
-        getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_))
+        sendBackUpstreamMinClock(sender, task)
       }
 
     case GetLatestMinClock =>
@@ -263,7 +261,7 @@ class ClockService(
     case GetCheckpointClock =>
       sender ! CheckpointClock(minCheckpointClock)
 
-    case getStalling: GetStallingTasks =>
+    case GetStallingTasks =>
       sender ! StallingTasks(healthChecker.getReport.stallingTasks)
 
     case ChangeToNewDAG(newDag) =>
@@ -282,6 +280,10 @@ class ClockService(
       })
   }
 
+  private def sendBackUpstreamMinClock(sender: ActorRef, task: TaskId): Unit = {
+    sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId))
+  }
+
   private def removeProcessor(processorId: ProcessorId): Unit = {
     clocks = clocks - processorId
     processorClocks = processorClocks.filter(_.processorId != processorId)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 318ebf8..10648b4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -30,14 +30,19 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.metrics.Metrics
 import org.apache.gearpump.serializer.SerializationFramework
 import org.apache.gearpump.streaming.AppMasterToExecutor._
+import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.ExecutorToAppMaster._
 import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.task.TaskActor._
 import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
 import org.apache.gearpump.{MAX_TIME_MILLIS, Message, MIN_TIME_MILLIS, TimeStamp}
 
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+
 /**
- *
- * All tasks of Gearpump runs inside a Actor. TaskActor is the Actor container for a task.
+ * All tasks of Gearpump run inside an Actor. TaskActor is the Actor container for a task.
  */
 class TaskActor(
     val taskId: TaskId,
@@ -48,20 +53,17 @@ class TaskActor(
     extends Actor with ExpressTransport with TimeOutScheduler {
   private var upstreamMinClock: TimeStamp = MIN_TIME_MILLIS
   private var _minClock: TimeStamp = MIN_TIME_MILLIS
-  private var minClockReported: Boolean = true
 
   def serializerPool: SerializationFramework = inputSerializerPool
 
-  import taskContextData._
-
-  import org.apache.gearpump.streaming.Constants._
-  import org.apache.gearpump.streaming.task.TaskActor._
-  val config = context.system.settings.config
+  private val config = context.system.settings.config
 
-  val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
+  val LOG: Logger = LogUtil.getLogger(getClass,
+    app = taskContextData.appId, executor = taskContextData.executorId, task = taskId)
 
   // Metrics
-  private val metricName = s"app${appId}.processor${taskId.processorId}.task${taskId.index}"
+  private val metricName =
+    s"app${taskContextData.appId}.processor${taskId.processorId}.task${taskId.index}"
   private val receiveLatency = Metrics(context.system).histogram(
     s"$metricName:receiveLatency", sampleRate = 1)
   private val processTime = Metrics(context.system).histogram(s"$metricName:processTime")
@@ -76,7 +78,6 @@ class TaskActor(
   private var life = taskContextData.life
 
   // Latency probe
-  import scala.concurrent.duration._
 
   import context.dispatcher
   final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
@@ -137,7 +138,7 @@ class TaskActor(
   }
 
   final override def preStart(): Unit = {
-    val register = RegisterTask(taskId, executorId, local)
+    val register = RegisterTask(taskId, taskContextData.executorId, local)
     LOG.info(s"$register")
     executor ! register
     context.become(waitForTaskRegistered)
@@ -175,15 +176,14 @@ class TaskActor(
 
   private def onStartClock(): Unit = {
     LOG.info(s"received start, clock: $upstreamMinClock, sessionId: $sessionId")
-    subscriptions = subscribers.map { subscriber =>
+    subscriptions = taskContextData.subscribers.map { subscriber =>
       (subscriber.processorId,
-        new Subscription(appId, executorId, taskId, subscriber, sessionId, this,
-          maxPendingMessageCount, ackOnceEveryMessageCount))
+        new Subscription(taskContextData.appId, taskContextData.executorId, taskId, subscriber,
+          sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount))
     }.sortBy(_._1)
 
     subscriptions.foreach(_._2.start())
 
-    import scala.collection.JavaConverters._
     stashQueue.asScala.foreach { item =>
       handleMessages(item.sender).apply(item.msg)
     }
@@ -194,13 +194,13 @@ class TaskActor(
     // target
     onStart(Instant.ofEpochMilli(_minClock))
 
-    appMaster ! GetUpstreamMinClock(taskId)
+    taskContextData.appMaster ! GetUpstreamMinClock(taskId)
     context.become(handleMessages(sender))
   }
 
   def waitForTaskRegistered: Receive = {
-    case start@TaskRegistered(_, sessionId, startClock) =>
-      this.sessionId = sessionId
+    case TaskRegistered(_, id, startClock) =>
+      this.sessionId = id
       this._minClock = startClock
       context.become(waitForStartClock)
   }
@@ -208,7 +208,8 @@ class TaskActor(
   private val stashQueue = new util.LinkedList[MessageAndSender]()
 
   def waitForStartClock: Receive = {
-    case start: StartTask =>
+    case start@StartTask(tid) =>
+      assert(tid == this.taskId, s"$start sent to the wrong task ${this.taskId}")
       onStartClock()
     case other: AnyRef =>
       stashQueue.add(MessageAndSender(other, sender()))
@@ -221,6 +222,7 @@ class TaskActor(
         queue.add(SendAck(ackResponse, ackRequest.taskId))
         doHandleMessage()
       }
+
     case ackRequest: AckRequest =>
       // Enqueue to handle the ackRequest and send back ack later
       val ackResponse = securityChecker.generateAckResponse(ackRequest, sender,
@@ -229,36 +231,45 @@ class TaskActor(
         queue.add(SendAck(ackResponse, ackRequest.taskId))
         doHandleMessage()
       }
+
     case ack: Ack =>
       subscriptions.find(_._1 == ack.taskId.processorId).foreach(_._2.receiveAck(ack))
       doHandleMessage()
+
     case inputMessage: SerializedMessage =>
       val message = Message(serializerPool.get().deserialize(inputMessage.bytes),
         inputMessage.timeStamp)
       receiveMessage(message, sender)
+
     case inputMessage: Message =>
       receiveMessage(inputMessage, sender)
-    case watermark@Watermark(instant) =>
-      if (self.eq(sender) && minClockReported) {
-        updateUpstreamMinClock(instant.toEpochMilli)
-        minClockReported = false
-      }
 
+    case watermark@Watermark(instant) =>
+      assert(sender.eq(self), "Watermark should only be sent from Task to itself")
+      onUpstreamMinClock(instant.toEpochMilli)
       receiveMessage(watermark.toMessage, sender)
 
     case UpstreamMinClock(upstreamClock) =>
-      updateUpstreamMinClock(upstreamClock)
-
-    case ChangeTask(_, dagVersion, life, subscribers) =>
-      this.life = life
+      // 1. received from ClockService and report minClock back after CLOCK_REPORT_INTERVAL
+      //    then ClockService will send another update and loop
+      // 2. The loop is kicked off by GetUpstreamMinClock on start
+      // 3. upstreamClock is None for source task since it's reported as watermark above
+      //    by external source
+      // 4. this is designed to avoid flooding the ClockService
+      upstreamClock.foreach(onUpstreamMinClock)
+      reportMinClock()
+
+    case ChangeTask(_, dagVersion, newLife, subscribers) =>
+      this.life = newLife
       subscribers.foreach { subscriber =>
         val processorId = subscriber.processorId
         val subscription = getSubscription(processorId)
         subscription match {
-          case Some(subscription) =>
-            subscription.changeLife(subscriber.lifeTime cross this.life)
+          case Some(subs) =>
+            subs.changeLife(subscriber.lifeTime cross this.life)
           case None =>
-            val subscription = new Subscription(appId, executorId, taskId, subscriber,
+            val subscription = new Subscription(taskContextData.appId,
+              taskContextData.executorId, taskId, subscriber,
               sessionId, this, maxPendingMessageCount, ackOnceEveryMessageCount)
             subscription.start()
             subscriptions :+=(subscriber.processorId, subscription)
@@ -267,11 +278,13 @@ class TaskActor(
         }
       }
       sender ! TaskChanged(taskId, dagVersion)
+
     case LatencyProbe(timeStamp) =>
       receiveLatency.update(System.currentTimeMillis() - timeStamp)
-    case send: SendMessageLoss =>
-      LOG.info("received SendMessageLoss")
+
+    case SendMessageLoss =>
       throw new MsgLostException
+
     case other: AnyRef =>
       queue.add(other)
       doHandleMessage()
@@ -310,7 +323,19 @@ class TaskActor(
     subscriptions.find(_._1 == processorId).map(_._2)
   }
 
-  private def updateUpstreamMinClock(upstreamClock: TimeStamp): Unit = {
+  /**
+   * On receiving upstream min clock, this task will
+   *
+   *   1. update its upstreamMinClock and trigger watermark progress method
+   *      if the new value is larger
+   *   2. update its own min clock
+   *   3. check for its own lifetime
+   *
+   * @param upstreamClock for DataSourceTask the clock comes from itself by DataSource.getWatermark
+   *                      for other tasks, the clock comes from that reported to ClockService
+   *                      by upstream tasks
+   */
+  private def onUpstreamMinClock(upstreamClock: TimeStamp): Unit = {
     if (upstreamClock > this.upstreamMinClock) {
       this.upstreamMinClock = upstreamClock
       task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock))
@@ -328,22 +353,22 @@ class TaskActor(
 
     _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock))
 
-    val update = UpdateClock(taskId, _minClock)
-    context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
-      appMaster ! update
-      minClockReported = true
-    }
-
-
     // Checks whether current task is dead.
     if (_minClock > life.death) {
       // There will be no more message received...
-      val unRegister = UnRegisterTask(taskId, executorId)
+      val unRegister = UnRegisterTask(taskId, taskContextData.executorId)
       executor ! unRegister
 
       LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life")
     }
   }
+
+  private def reportMinClock(): Unit = {
+    val update = UpdateClock(taskId, _minClock)
+    context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
+      taskContextData.appMaster ! update
+    }
+  }
 }
 
 object TaskActor {
@@ -412,7 +437,7 @@ object TaskActor {
 
   case object FLUSH
 
-  val NONE_SESSION = -1
+  val NONE_SESSION: Int = -1
 
   case class MessageAndSender(msg: AnyRef, sender: ActorRef)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
index 73cd5af..c2e2faa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -54,7 +54,7 @@ case object GetCheckpointClock extends ClockEvent
 
 case class CheckpointClock(clock: Option[TimeStamp])
 
-case class UpstreamMinClock(latestMinClock: TimeStamp)
+case class UpstreamMinClock(latestMinClock: Option[TimeStamp])
 
 case class LatestMinClock(clock: TimeStamp)
 
@@ -67,7 +67,7 @@ case object EndingClock
 /** Probe the latency between two upstream to downstream tasks. */
 case class LatencyProbe(timestamp: Long)
 
-case class SendMessageLoss()
+case object SendMessageLoss
 
 case object GetDAG
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index c1791aa..4faa058 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -154,6 +154,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
 
       // clock status: task(0,0) -> 1, task(0,1)->0, task(1,0)->0, task(1,1)->0
       appMaster.tell(UpdateClock(TaskId(0, 0), 1), mockTask.ref)
+      mockTask.expectMsg(UpstreamMinClock(None))
 
       // check min clock
       appMaster.tell(GetLatestMinClock, mockTask.ref)
@@ -161,6 +162,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
 
       // clock status: task(0,0) -> 1, task(0,1)->1, task(1, 0)->0, task(1,1)->0
       appMaster.tell(UpdateClock(TaskId(0, 1), 1), mockTask.ref)
+      mockTask.expectMsg(UpstreamMinClock(None))
 
       // check min clock
       appMaster.tell(GetLatestMinClock, mockTask.ref)
@@ -170,7 +172,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
       appMaster.tell(UpdateClock(TaskId(1, 0), 1), mockTask.ref)
 
       // Min clock of processor 0 (Task(0, 0) and Task(0, 1))
-      mockTask.expectMsg(UpstreamMinClock(1))
+      mockTask.expectMsg(UpstreamMinClock(Some(1)))
 
       // check min clock
       appMaster.tell(GetLatestMinClock, mockTask.ref)
@@ -180,7 +182,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
       appMaster.tell(UpdateClock(TaskId(1, 1), 1), mockTask.ref)
 
       // min clock of processor 0 (Task(0, 0) and Task(0, 1))
-      mockTask.expectMsg(UpstreamMinClock(1))
+      mockTask.expectMsg(UpstreamMinClock(Some(1)))
 
       // check min clock
       appMaster.tell(GetLatestMinClock, mockTask.ref)
@@ -228,6 +230,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with
       for (i <- 1 to 5) {
         val taskId = TaskId(0, 0)
         appMaster.tell(UpdateClock(taskId, i), mockTask.ref)
+        mockTask.expectMsgType[UpstreamMinClock]
 
         val cause = s"message loss $i from $taskId"
         appMaster.tell(MessageLoss(0, taskId, cause), mockTask.ref)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/0bc65218/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
index 4b824e0..ae78980 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -53,11 +53,13 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
       val startClock = 100L
       store.put(ClockService.START_CLOCK, startClock)
       val clockService = system.actorOf(Props(new ClockService(dag, appMaster, store)))
+
       clockService ! GetLatestMinClock
       expectMsg(LatestMinClock(startClock))
 
       // task(0,0): clock(101); task(1,0): clock(100)
       clockService ! UpdateClock(TaskId(0, 0), 101)
+      expectMsg(UpstreamMinClock(None))
 
       // Min clock is updated
       clockService ! GetLatestMinClock
@@ -67,7 +69,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
       clockService ! UpdateClock(TaskId(1, 0), 101)
 
       // Upstream is Task(0, 0), 101
-      expectMsg(UpstreamMinClock(101))
+      expectMsg(UpstreamMinClock(Some(101)))
 
       // Min clock is updated
       clockService ! GetLatestMinClock
@@ -121,6 +123,7 @@ class ClockServiceSpec(_system: ActorSystem) extends TestKit(_system) with Impli
       clockService ! UpdateClock(TaskId(0, 0), 200L)
       clockService ! UpdateClock(TaskId(1, 0), 200L)
       expectMsgType[UpstreamMinClock]
+      expectMsgType[UpstreamMinClock]
 
       clockService ! GetStartClock
       expectMsg(StartClock(200L))