You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:15 UTC
[05/49] incubator-gearpump git commit: fix #1318,
fix MinClock not updated fast enough for slow stream
fix #1318, fix MinClock not updated fast enough for slow stream
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/269838e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/269838e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/269838e8
Branch: refs/heads/master
Commit: 269838e8bb4560156344b3c0931a53fbbb43cc92
Parents: 099842a
Author: manuzhang <ow...@gmail.com>
Authored: Mon Dec 14 18:21:49 2015 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:22:39 2016 +0800
----------------------------------------------------------------------
.../gearpump/streaming/task/Subscription.scala | 49 +++++++++++++++-----
.../io/gearpump/streaming/task/TaskActor.scala | 46 +++++++++++-------
.../streaming/task/SubscriptionSpec.scala | 4 +-
3 files changed, 70 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/269838e8/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
index 6836687..5f321e4 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
@@ -107,18 +107,15 @@ class Subscription(
this.minClockValue(partition) = Math.min(this.minClockValue(partition), msg.timestamp)
this.candidateMinClock(partition) = Math.min(this.candidateMinClock(partition), msg.timestamp)
- messageCount(partition) = (messageCount(partition) + 1).toShort
- pendingMessageCount(partition) = (pendingMessageCount(partition) + 1).toShort
- updateMaxPendingCount()
+ incrementMessageCount(partition, 1)
if (messageCount(partition) % ackOnceEveryMessageCount == 0) {
- val ackRequest = AckRequest(taskId, messageCount(partition), sessionId)
- transport.transport(ackRequest, targetTask)
+ sendAckRequest(partition)
}
- if (messageCount(partition) % maxPendingMessageCount == 0) {
- val probeLatency = LatencyProbe(System.currentTimeMillis())
- transport.transport(probeLatency, targetTask)
+ if (messageCount(partition) / maxPendingMessageCount !=
+ (messageCount(partition) + ackOnceEveryMessageCount) / maxPendingMessageCount) {
+ sendLatencyProbe(partition)
}
return 1
@@ -140,8 +137,7 @@ class Subscription(
private def flush: Unit = {
lastFlushTime = System.currentTimeMillis()
allTasks.foreach { targetTaskId =>
- val ackRequest = AckRequest(taskId, messageCount(targetTaskId.index), sessionId)
- transport.transport(ackRequest, targetTaskId)
+ sendAckRequest(targetTaskId.index)
}
}
@@ -189,9 +185,40 @@ class Subscription(
maxPendingCount < maxPendingMessageCount
}
- private def updateMaxPendingCount() : Unit = {
+ def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = {
+ minClockValue.indices.foreach { i =>
+ if (minClockValue(i) == stallingTime && allowSendingMoreMessages) {
+ sendAckRequest(i)
+ sendLatencyProbe(i)
+ }
+ }
+ }
+
+ private def sendAckRequest(partition: Int): Unit = {
+ // we increment more count for each AckRequest
+ // to throttle the number of unacked AckRequest
+ incrementMessageCount(partition, ackOnceEveryMessageCount)
+ val targetTask = TaskId(processorId, partition)
+ val ackRequest = AckRequest(taskId, messageCount(partition), sessionId)
+ transport.transport(ackRequest, targetTask)
+ }
+
+ private def incrementMessageCount(partition: Int, count: Int): Unit = {
+ messageCount(partition) = (messageCount(partition) + count).toShort
+ pendingMessageCount(partition) = (pendingMessageCount(partition) + count).toShort
+ updateMaxPendingCount()
+ }
+
+ private def updateMaxPendingCount(): Unit = {
maxPendingCount = Shorts.max(pendingMessageCount: _*)
}
+
+ private def sendLatencyProbe(partition: Int): Unit = {
+ val probeLatency = LatencyProbe(System.currentTimeMillis())
+ val targetTask = TaskId(processorId, partition)
+ transport.transport(probeLatency, targetTask)
+ }
+
}
object Subscription {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/269838e8/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
index 9a3ceff..d54fce5 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
@@ -26,7 +26,7 @@ import io.gearpump.cluster.UserConfig
import io.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
import io.gearpump.metrics.Metrics
import io.gearpump.serializer.SerializationFramework
-import io.gearpump.streaming.AppMasterToExecutor.{TaskRejected, _}
+import io.gearpump.streaming.AppMasterToExecutor._
import io.gearpump.streaming.ExecutorToAppMaster._
import io.gearpump.streaming.{Constants, ProcessorId}
import io.gearpump.util.{LogUtil, TimeOutScheduler}
@@ -46,6 +46,7 @@ class TaskActor(
inputSerializerPool: SerializationFramework)
extends Actor with ExpressTransport with TimeOutScheduler{
var upstreamMinClock: TimeStamp = 0L
+ private var _minClock: TimeStamp = 0L
def serializerPool: SerializationFramework = inputSerializerPool
@@ -137,12 +138,6 @@ class TaskActor(
context.become(waitForTaskRegistered)
}
- def minClockAtCurrentTask: TimeStamp = {
- this.subscriptions.foldLeft(Long.MaxValue){ (clock, subscription) =>
- Math.min(clock, subscription._2.minClock)
- }
- }
-
private def allowSendingMoreMessages(): Boolean = {
subscriptions.forall(_._2.allowSendingMoreMessages())
}
@@ -227,7 +222,7 @@ class TaskActor(
}
case ackRequest: AckRequest =>
//enqueue to handle the ackRequest and send back ack later
- val ackResponse = securityChecker.generateAckResponse(ackRequest, sender)
+ val ackResponse = securityChecker.generateAckResponse(ackRequest, sender, ackOnceEveryMessageCount)
if (null != ackResponse) {
queue.add(SendAck(ackResponse, ackRequest.taskId))
doHandleMessage()
@@ -242,19 +237,31 @@ class TaskActor(
receiveMessage(inputMessage, sender)
case upstream@ UpstreamMinClock(upstreamClock) =>
this.upstreamMinClock = upstreamClock
- val latestMinClock = minClock
- val update = UpdateClock(taskId, latestMinClock)
+
+ val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
+ val subMin = sub._2.minClock
+ // a subscription is holding back the _minClock;
+ // we send AckRequest to its tasks to push _minClock forward
+ if (subMin == _minClock) {
+ sub._2.sendAckRequestOnStallingTime(_minClock)
+ }
+ Math.min(min, subMin)
+ }
+
+ _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock))
+
+ val update = UpdateClock(taskId, _minClock)
context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
appMaster ! update
}
// check whether current task is dead.
- if (latestMinClock > life.death) {
+ if (_minClock > life.death) {
// There will be no more message received...
val unRegister = UnRegisterTask(taskId, executorId)
executor ! unRegister
- LOG.info(s"Sending $unRegister, current minclock: $latestMinClock, life: $life")
+ LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life")
}
case ChangeTask(_, dagVersion, life, subscribers) =>
@@ -285,10 +292,14 @@ class TaskActor(
doHandleMessage()
}
- def minClock: TimeStamp = {
- Math.max(life.birth, Math.min(upstreamMinClock, minClockAtCurrentTask))
- }
+ /**
+ * @return min clock of this task
+ */
+ def minClock: TimeStamp = _minClock
+ /**
+ * @return min clock of upstream task
+ */
def getUpstreamMinClock: TimeStamp = upstreamMinClock
private def receiveMessage(msg: Message, sender: ActorRef): Unit = {
@@ -341,9 +352,12 @@ object TaskActor {
}
}
- def generateAckResponse(ackRequest: AckRequest, sender: ActorRef): Ack = {
+ def generateAckResponse(ackRequest: AckRequest, sender: ActorRef, incrementCount: Int): Ack = {
val sessionId = ackRequest.sessionId
if (receivedMsgCount.containsKey(sessionId)) {
+ // we increment more count for each AckRequest
+ // to throttle the number of unacked AckRequest
+ receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort)
Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId)
} else {
LOG.error(s"get unknown AckRequest $ackRequest from ${sender.toString()}")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/269838e8/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
index 138456a..ecad47b 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
@@ -100,10 +100,10 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
// we expect to receive two ackRequest for two downstream tasks
- val ackRequestForTask0 = AckRequest(taskId, 100, session)
+ val ackRequestForTask0 = AckRequest(taskId, 200, session)
verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1,0))
- val ackRequestForTask1 = AckRequest(taskId, 100, session)
+ val ackRequestForTask1 = AckRequest(taskId, 200, session)
verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1))
}