You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:15 UTC

[05/49] incubator-gearpump git commit: fix #1318, fix MinClock not updated fast enough for slow stream

fix #1318, fix MinClock not updated fast enough for slow stream


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/269838e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/269838e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/269838e8

Branch: refs/heads/master
Commit: 269838e8bb4560156344b3c0931a53fbbb43cc92
Parents: 099842a
Author: manuzhang <ow...@gmail.com>
Authored: Mon Dec 14 18:21:49 2015 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:22:39 2016 +0800

----------------------------------------------------------------------
 .../gearpump/streaming/task/Subscription.scala  | 49 +++++++++++++++-----
 .../io/gearpump/streaming/task/TaskActor.scala  | 46 +++++++++++-------
 .../streaming/task/SubscriptionSpec.scala       |  4 +-
 3 files changed, 70 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/269838e8/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
index 6836687..5f321e4 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
@@ -107,18 +107,15 @@ class Subscription(
       this.minClockValue(partition) = Math.min(this.minClockValue(partition), msg.timestamp)
       this.candidateMinClock(partition) = Math.min(this.candidateMinClock(partition), msg.timestamp)
 
-      messageCount(partition) = (messageCount(partition) + 1).toShort
-      pendingMessageCount(partition) = (pendingMessageCount(partition) + 1).toShort
-      updateMaxPendingCount()
+      incrementMessageCount(partition, 1)
 
       if (messageCount(partition) % ackOnceEveryMessageCount == 0) {
-        val ackRequest = AckRequest(taskId, messageCount(partition), sessionId)
-        transport.transport(ackRequest, targetTask)
+        sendAckRequest(partition)
       }
 
-      if (messageCount(partition) % maxPendingMessageCount == 0) {
-        val probeLatency = LatencyProbe(System.currentTimeMillis())
-        transport.transport(probeLatency, targetTask)
+      if (messageCount(partition) / maxPendingMessageCount !=
+        (messageCount(partition) + ackOnceEveryMessageCount) / maxPendingMessageCount) {
+        sendLatencyProbe(partition)
       }
 
       return 1
@@ -140,8 +137,7 @@ class Subscription(
   private def flush: Unit = {
     lastFlushTime = System.currentTimeMillis()
     allTasks.foreach { targetTaskId =>
-      val ackRequest = AckRequest(taskId, messageCount(targetTaskId.index), sessionId)
-      transport.transport(ackRequest, targetTaskId)
+      sendAckRequest(targetTaskId.index)
     }
   }
 
@@ -189,9 +185,40 @@ class Subscription(
     maxPendingCount < maxPendingMessageCount
   }
 
-  private def updateMaxPendingCount() : Unit = {
+  def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = {
+    minClockValue.indices.foreach { i =>
+      if (minClockValue(i) == stallingTime && allowSendingMoreMessages) {
+        sendAckRequest(i)
+        sendLatencyProbe(i)
+      }
+    }
+  }
+
+  private def sendAckRequest(partition: Int): Unit = {
+    // we increment more count for each AckRequest
+    // to throttle the number of unacked AckRequest
+    incrementMessageCount(partition, ackOnceEveryMessageCount)
+    val targetTask = TaskId(processorId, partition)
+    val ackRequest = AckRequest(taskId, messageCount(partition), sessionId)
+    transport.transport(ackRequest, targetTask)
+  }
+
+  private def incrementMessageCount(partition: Int, count: Int): Unit = {
+    messageCount(partition) = (messageCount(partition) + count).toShort
+    pendingMessageCount(partition) = (pendingMessageCount(partition) + count).toShort
+    updateMaxPendingCount()
+  }
+
+  private def updateMaxPendingCount(): Unit = {
     maxPendingCount = Shorts.max(pendingMessageCount: _*)
   }
+
+  private def sendLatencyProbe(partition: Int): Unit = {
+    val probeLatency = LatencyProbe(System.currentTimeMillis())
+    val targetTask = TaskId(processorId, partition)
+    transport.transport(probeLatency, targetTask)
+  }
+
 }
 
 object Subscription {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/269838e8/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
index 9a3ceff..d54fce5 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
@@ -26,7 +26,7 @@ import io.gearpump.cluster.UserConfig
 import io.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
 import io.gearpump.metrics.Metrics
 import io.gearpump.serializer.SerializationFramework
-import io.gearpump.streaming.AppMasterToExecutor.{TaskRejected, _}
+import io.gearpump.streaming.AppMasterToExecutor._
 import io.gearpump.streaming.ExecutorToAppMaster._
 import io.gearpump.streaming.{Constants, ProcessorId}
 import io.gearpump.util.{LogUtil, TimeOutScheduler}
@@ -46,6 +46,7 @@ class TaskActor(
     inputSerializerPool: SerializationFramework)
   extends Actor with ExpressTransport  with TimeOutScheduler{
   var upstreamMinClock: TimeStamp = 0L
+  private var _minClock: TimeStamp = 0L
 
   def serializerPool: SerializationFramework = inputSerializerPool
 
@@ -137,12 +138,6 @@ class TaskActor(
     context.become(waitForTaskRegistered)
   }
 
-  def minClockAtCurrentTask: TimeStamp = {
-    this.subscriptions.foldLeft(Long.MaxValue){ (clock, subscription) =>
-      Math.min(clock, subscription._2.minClock)
-    }
-  }
-
   private def allowSendingMoreMessages(): Boolean = {
     subscriptions.forall(_._2.allowSendingMoreMessages())
   }
@@ -227,7 +222,7 @@ class TaskActor(
       }
     case ackRequest: AckRequest =>
       //enqueue to handle the ackRequest and send back ack later
-      val ackResponse = securityChecker.generateAckResponse(ackRequest, sender)
+      val ackResponse = securityChecker.generateAckResponse(ackRequest, sender, ackOnceEveryMessageCount)
       if (null != ackResponse) {
         queue.add(SendAck(ackResponse, ackRequest.taskId))
         doHandleMessage()
@@ -242,19 +237,31 @@ class TaskActor(
       receiveMessage(inputMessage, sender)
     case upstream@ UpstreamMinClock(upstreamClock) =>
       this.upstreamMinClock = upstreamClock
-      val latestMinClock = minClock
-      val update = UpdateClock(taskId, latestMinClock)
+
+      val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
+        val subMin = sub._2.minClock
+        // a subscription is holding back the _minClock;
+        // we send AckRequest to its tasks to push _minClock forward
+        if (subMin == _minClock) {
+          sub._2.sendAckRequestOnStallingTime(_minClock)
+        }
+        Math.min(min, subMin)
+      }
+
+      _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock))
+
+      val update = UpdateClock(taskId, _minClock)
       context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
         appMaster ! update
       }
 
       // check whether current task is dead.
-      if (latestMinClock > life.death) {
+      if (_minClock > life.death) {
         // There will be no more message received...
         val unRegister = UnRegisterTask(taskId, executorId)
         executor ! unRegister
 
-        LOG.info(s"Sending $unRegister, current minclock: $latestMinClock, life: $life")
+        LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life")
       }
 
     case ChangeTask(_, dagVersion, life, subscribers) =>
@@ -285,10 +292,14 @@ class TaskActor(
       doHandleMessage()
   }
 
-  def minClock: TimeStamp = {
-    Math.max(life.birth, Math.min(upstreamMinClock, minClockAtCurrentTask))
-  }
+  /**
+   * @return min clock of this task
+   */
+  def minClock: TimeStamp = _minClock
 
+  /**
+   * @return min clock of upstream task
+   */
   def getUpstreamMinClock: TimeStamp = upstreamMinClock
 
   private def receiveMessage(msg: Message, sender: ActorRef): Unit = {
@@ -341,9 +352,12 @@ object TaskActor {
       }
     }
 
-    def generateAckResponse(ackRequest: AckRequest, sender: ActorRef): Ack = {
+    def generateAckResponse(ackRequest: AckRequest, sender: ActorRef, incrementCount: Int): Ack = {
       val sessionId = ackRequest.sessionId
       if (receivedMsgCount.containsKey(sessionId)) {
+        // we increment more count for each AckRequest
+        // to throttle the number of unacked AckRequest
+        receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort)
         Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId)
       } else {
         LOG.error(s"get unknown AckRequest $ackRequest from ${sender.toString()}")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/269838e8/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
index 138456a..ecad47b 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
@@ -100,10 +100,10 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
 
 
     // we expect to receive two ackRequest for two downstream tasks
-    val ackRequestForTask0 = AckRequest(taskId, 100, session)
+    val ackRequestForTask0 = AckRequest(taskId, 200, session)
     verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1,0))
 
-    val ackRequestForTask1 = AckRequest(taskId, 100, session)
+    val ackRequestForTask1 = AckRequest(taskId, 200, session)
     verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1))
   }