You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/10/26 22:36:28 UTC

incubator-gearpump git commit: [GEARPUMP-359] Fix OutputWatermark advancing logic in Subscription

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 0bc6ac375 -> cb6aced50


[GEARPUMP-359] Fix OutputWatermark advancing logic in Subscription

Author: huafengw <fv...@gmail.com>

Closes #234 from huafengw/subs.

fix style


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/cb6aced5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/cb6aced5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/cb6aced5

Branch: refs/heads/master
Commit: cb6aced500e95b95e4ef0e870db7017d5647fbaa
Parents: 0bc6ac3
Author: huafengw <fv...@gmail.com>
Authored: Fri Oct 27 06:34:50 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Oct 27 06:36:10 2017 +0800

----------------------------------------------------------------------
 .../gearpump/streaming/MessageSerializer.scala  | 12 ++++--
 .../gearpump/streaming/task/Subscription.scala  | 14 ++-----
 .../gearpump/streaming/task/TaskActor.scala     |  5 ++-
 .../streaming/task/TaskControlMessage.scala     |  5 ++-
 .../streaming/MessageSerializerSpec.scala       |  6 +--
 .../streaming/task/SubscriptionSpec.scala       | 42 ++++++++++++--------
 .../gearpump/streaming/task/TaskActorSpec.scala |  2 +-
 7 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
index 20e2529..10879ac 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
@@ -40,13 +40,14 @@ class TaskIdSerializer extends TaskMessageSerializer[TaskId] {
 class AckSerializer extends TaskMessageSerializer[Ack] {
   val taskIdSerializer = new TaskIdSerializer
 
-  override def getLength(obj: Ack): Int = taskIdSerializer.getLength(obj.taskId) + 8
+  override def getLength(obj: Ack): Int = taskIdSerializer.getLength(obj.taskId) + 16
 
   override def write(dataOutput: DataOutput, obj: Ack): Unit = {
     taskIdSerializer.write(dataOutput, obj.taskId)
     dataOutput.writeShort(obj.seq)
     dataOutput.writeShort(obj.actualReceivedNum)
     dataOutput.writeInt(obj.sessionId)
+    dataOutput.writeLong(obj.watermark)
   }
 
   override def read(dataInput: DataInput): Ack = {
@@ -54,7 +55,8 @@ class AckSerializer extends TaskMessageSerializer[Ack] {
     val seq = dataInput.readShort()
     val actualReceivedNum = dataInput.readShort()
     val sessionId = dataInput.readInt()
-    Ack(taskId, seq, actualReceivedNum, sessionId)
+    val watermark = dataInput.readLong()
+    Ack(taskId, seq, actualReceivedNum, sessionId, watermark)
   }
 }
 
@@ -78,19 +80,21 @@ class InitialAckRequestSerializer extends TaskMessageSerializer[InitialAckReques
 class AckRequestSerializer extends TaskMessageSerializer[AckRequest] {
   val taskIdSerializer = new TaskIdSerializer
 
-  override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 6
+  override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 14
 
   override def write(dataOutput: DataOutput, obj: AckRequest): Unit = {
     taskIdSerializer.write(dataOutput, obj.taskId)
     dataOutput.writeShort(obj.seq)
     dataOutput.writeInt(obj.sessionId)
+    dataOutput.writeLong(obj.watermark)
   }
 
   override def read(dataInput: DataInput): AckRequest = {
     val taskId = taskIdSerializer.read(dataInput)
     val seq = dataInput.readShort()
     val sessionId = dataInput.readInt()
-    AckRequest(taskId, seq, sessionId)
+    val watermark = dataInput.readLong()
+    AckRequest(taskId, seq, sessionId, watermark)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 24f1763..ab99323 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -58,12 +58,9 @@ class Subscription(
   // Don't worry if this store negative number. We will wrap the Short
   private val messageCount: Array[Short] = new Array[Short](parallelism)
   private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
-  private val processingWatermarkSince: Array[Short] = new Array[Short](parallelism)
 
   private val outputWatermark: Array[MilliSeconds] = Array.fill(parallelism)(
     Watermark.MIN.toEpochMilli)
-  private val processingWatermark: Array[MilliSeconds] = Array.fill(parallelism)(
-    Watermark.MIN.toEpochMilli)
 
   private var maxPendingCount: Short = 0
 
@@ -113,8 +110,6 @@ class Subscription(
       val targetTask = TaskId(processorId, partition)
       publisher.transport(msg, targetTask)
 
-      this.processingWatermark(partition) = publisher.getProcessingWatermark.toEpochMilli
-
       incrementMessageCount(partition, 1)
 
       if (messageCount(partition) % ackOnceEveryMessageCount == 0) {
@@ -162,14 +157,12 @@ class Subscription(
    * @param ack acknowledge message received
    */
   def receiveAck(ack: Ack): Unit = {
-
     val index = ack.taskId.index
 
     if (ack.sessionId == sessionId) {
       if (ack.actualReceivedNum == ack.seq) {
-        if ((ack.seq - processingWatermarkSince(index)).toShort >= 0) {
-          outputWatermark(index) = processingWatermark(index)
-          processingWatermarkSince(index) = messageCount(index)
+        if (ack.watermark > outputWatermark(index)) {
+          outputWatermark(index) = ack.watermark
         }
 
         pendingMessageCount(ack.taskId.index) = (messageCount(ack.taskId.index) - ack.seq).toShort
@@ -209,7 +202,8 @@ class Subscription(
     // to throttle the number of unacked AckRequest
     incrementMessageCount(partition, ackOnceEveryMessageCount)
     val targetTask = TaskId(processorId, partition)
-    val ackRequest = AckRequest(taskId, messageCount(partition), sessionId)
+    val processingWatermark = publisher.getProcessingWatermark.toEpochMilli
+    val ackRequest = AckRequest(taskId, messageCount(partition), sessionId, processingWatermark)
     publisher.transport(ackRequest, targetTask)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index b43457e..9d9778e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -392,7 +392,7 @@ object TaskActor {
         null
       } else {
         receivedMsgCount.put(sessionId, 0)
-        Ack(task_id, 0, 0, sessionId)
+        Ack(task_id, 0, 0, sessionId, Watermark.MIN.toEpochMilli)
       }
     }
 
@@ -402,7 +402,8 @@ object TaskActor {
         // Increments more count for each AckRequest
         // to throttle the number of unacked AckRequest
         receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort)
-        Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId)
+        Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId,
+          ackRequest.watermark)
       } else {
         LOG.error(s"get unknown AckRequest $ackRequest from ${sender.toString()}")
         null

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
index 4ba9315..ffa3134 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -30,7 +30,7 @@ case class InitialAckRequest(taskId: TaskId, sessionId: Int)
   Here the sessionId filed is used to distinguish messages
     between different replays after the application restart
  */
-case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int)
+case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int, watermark: Long)
 
 /**
  * Ack back to sender task actor.
@@ -38,7 +38,8 @@ case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int)
  * @param seq The seq field represents the expected number of received messages and the
  *            actualReceivedNum field means the actual received number since start.
  */
-case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int)
+case class Ack(
+    taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int, watermark: Long)
 
 sealed trait ClockEvent
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
index f6f6af2..1ab2358 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
@@ -17,9 +17,9 @@
  */
 package org.apache.gearpump.streaming
 
+import org.apache.gearpump.streaming.source.Watermark
 import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers}
 import org.scalatest.{Matchers, WordSpec}
-
 import org.apache.gearpump.streaming.task._
 import org.apache.gearpump.transport.netty.WrappedChannelBuffer
 
@@ -55,7 +55,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
   "AckRequestSerializer" should {
     "serialize and deserialize AckRequest properly" in {
       val serializer = new AckRequestSerializer
-      val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024)
+      val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024, Watermark.MAX.toEpochMilli)
       assert(testSerializer(ackRequest, serializer).equals(ackRequest))
     }
   }
@@ -71,7 +71,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
   "AckSerializer" should {
     "serialize and deserialize Ack properly" in {
       val serializer = new AckSerializer
-      val ack = Ack(TaskId(1, 2), 1024, 1023, 1799)
+      val ack = Ack(TaskId(1, 2), 1024, 1023, 1799, Watermark.MAX.toEpochMilli)
       assert(testSerializer(ack, serializer).equals(ack))
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index b05befa..8079928 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -70,6 +70,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
     val (subscription, sender) = prepare
     val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70))
     when(sender.getProcessingWatermark).thenReturn(msg1.timestamp)
+    // Send first message to Task(1, 1)
     subscription.sendMessage(msg1)
 
     verify(sender, times(1)).transport(msg1, TaskId(1, 1))
@@ -77,6 +78,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
 
     val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50))
     when(sender.getProcessingWatermark).thenReturn(msg2.timestamp)
+    // Send first message to Task(1, 0)
     subscription.sendMessage(msg2)
 
     verify(sender, times(1)).transport(msg2, TaskId(1, 0))
@@ -85,31 +87,39 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
     val initialMinClock = subscription.watermark
 
     // Acks initial AckRequest(0)
-    subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session))
-    subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session))
+    subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session, Watermark.MIN.toEpochMilli))
+    subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session, Watermark.MIN.toEpochMilli))
 
-    // Sends 100 messages
-    100 until 200 foreach { clock =>
-      when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(clock),
-        Instant.ofEpochMilli(clock))
+    // Sends 98 more messages to each downstream task
+    100 until 198 foreach { clock =>
       subscription.sendMessage(Message("1", clock))
       subscription.sendMessage(Message("2", clock))
     }
 
-    assert(subscription.watermark == 50)
+    // Triger sending AckRequest
+    val inOrders = org.mockito.Mockito.inOrder(sender, sender)
 
-    subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session))
-    subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session))
+    val msg3 = Message("1", Instant.ofEpochMilli(200))
+    val expectedAckRequest = AckRequest(taskId, 200, session, 200)
+    when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(200))
+    subscription.sendMessage(msg3)
+    inOrders.verify(sender).transport(msg3, TaskId(1, 1))
+    inOrders.verify(sender).transport(expectedAckRequest, TaskId(1, 1))
 
-    // Ack received, minClock changed
-    assert(subscription.watermark > initialMinClock)
+    val msg4 = Message("2", Instant.ofEpochMilli(200))
+    val expectedAckRequest2 = AckRequest(taskId, 200, session, 220)
+    when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(220))
+    subscription.sendMessage(msg4)
+    inOrders.verify(sender).transport(msg4, TaskId(1, 0))
+    inOrders.verify(sender).transport(expectedAckRequest2, TaskId(1, 0))
+
+    assert(subscription.watermark == initialMinClock)
 
-    // Expects to receive two ackRequest for two downstream tasks
-    val ackRequestForTask0 = AckRequest(taskId, 200, session)
-    verify(sender, times(1)).transport(ackRequestForTask0, TaskId(1, 0))
+    subscription.receiveAck(Ack(TaskId(1, 1), 200, 200, session, 200))
+    subscription.receiveAck(Ack(TaskId(1, 0), 200, 200, session, 220))
 
-    val ackRequestForTask1 = AckRequest(taskId, 200, session)
-    verify(sender, times(1)).transport(ackRequestForTask1, TaskId(1, 1))
+    // Ack received, minClock changed
+    assert(subscription.watermark == 200)
   }
 
   it should "disallow more message sending if there is no ack back" in {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
index 8deee78..3906b36 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
@@ -80,7 +80,7 @@ class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with
       testActor ! StartTask(taskId1)
 
       implicit val system = getActorSystem
-      val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId)
+      val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId, 1024L)
       EventFilter[MsgLostException](occurrences = 1) intercept {
         testActor ! ack
       }