You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/10/26 22:36:28 UTC
incubator-gearpump git commit: [GEARPUMP-359] Fix OutputWatermark
advancing logic in Subscription
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 0bc6ac375 -> cb6aced50
[GEARPUMP-359] Fix OutputWatermark advancing logic in Subscription
Author: huafengw <fv...@gmail.com>
Closes #234 from huafengw/subs.
fix style
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/cb6aced5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/cb6aced5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/cb6aced5
Branch: refs/heads/master
Commit: cb6aced500e95b95e4ef0e870db7017d5647fbaa
Parents: 0bc6ac3
Author: huafengw <fv...@gmail.com>
Authored: Fri Oct 27 06:34:50 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Oct 27 06:36:10 2017 +0800
----------------------------------------------------------------------
.../gearpump/streaming/MessageSerializer.scala | 12 ++++--
.../gearpump/streaming/task/Subscription.scala | 14 ++-----
.../gearpump/streaming/task/TaskActor.scala | 5 ++-
.../streaming/task/TaskControlMessage.scala | 5 ++-
.../streaming/MessageSerializerSpec.scala | 6 +--
.../streaming/task/SubscriptionSpec.scala | 42 ++++++++++++--------
.../gearpump/streaming/task/TaskActorSpec.scala | 2 +-
7 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
index 20e2529..10879ac 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/MessageSerializer.scala
@@ -40,13 +40,14 @@ class TaskIdSerializer extends TaskMessageSerializer[TaskId] {
class AckSerializer extends TaskMessageSerializer[Ack] {
val taskIdSerializer = new TaskIdSerializer
- override def getLength(obj: Ack): Int = taskIdSerializer.getLength(obj.taskId) + 8
+ override def getLength(obj: Ack): Int = taskIdSerializer.getLength(obj.taskId) + 16
override def write(dataOutput: DataOutput, obj: Ack): Unit = {
taskIdSerializer.write(dataOutput, obj.taskId)
dataOutput.writeShort(obj.seq)
dataOutput.writeShort(obj.actualReceivedNum)
dataOutput.writeInt(obj.sessionId)
+ dataOutput.writeLong(obj.watermark)
}
override def read(dataInput: DataInput): Ack = {
@@ -54,7 +55,8 @@ class AckSerializer extends TaskMessageSerializer[Ack] {
val seq = dataInput.readShort()
val actualReceivedNum = dataInput.readShort()
val sessionId = dataInput.readInt()
- Ack(taskId, seq, actualReceivedNum, sessionId)
+ val watermark = dataInput.readLong()
+ Ack(taskId, seq, actualReceivedNum, sessionId, watermark)
}
}
@@ -78,19 +80,21 @@ class InitialAckRequestSerializer extends TaskMessageSerializer[InitialAckReques
class AckRequestSerializer extends TaskMessageSerializer[AckRequest] {
val taskIdSerializer = new TaskIdSerializer
- override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 6
+ override def getLength(obj: AckRequest): Int = taskIdSerializer.getLength(obj.taskId) + 14
override def write(dataOutput: DataOutput, obj: AckRequest): Unit = {
taskIdSerializer.write(dataOutput, obj.taskId)
dataOutput.writeShort(obj.seq)
dataOutput.writeInt(obj.sessionId)
+ dataOutput.writeLong(obj.watermark)
}
override def read(dataInput: DataInput): AckRequest = {
val taskId = taskIdSerializer.read(dataInput)
val seq = dataInput.readShort()
val sessionId = dataInput.readInt()
- AckRequest(taskId, seq, sessionId)
+ val watermark = dataInput.readLong()
+ AckRequest(taskId, seq, sessionId, watermark)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 24f1763..ab99323 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -58,12 +58,9 @@ class Subscription(
// Don't worry if this store negative number. We will wrap the Short
private val messageCount: Array[Short] = new Array[Short](parallelism)
private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
- private val processingWatermarkSince: Array[Short] = new Array[Short](parallelism)
private val outputWatermark: Array[MilliSeconds] = Array.fill(parallelism)(
Watermark.MIN.toEpochMilli)
- private val processingWatermark: Array[MilliSeconds] = Array.fill(parallelism)(
- Watermark.MIN.toEpochMilli)
private var maxPendingCount: Short = 0
@@ -113,8 +110,6 @@ class Subscription(
val targetTask = TaskId(processorId, partition)
publisher.transport(msg, targetTask)
- this.processingWatermark(partition) = publisher.getProcessingWatermark.toEpochMilli
-
incrementMessageCount(partition, 1)
if (messageCount(partition) % ackOnceEveryMessageCount == 0) {
@@ -162,14 +157,12 @@ class Subscription(
* @param ack acknowledge message received
*/
def receiveAck(ack: Ack): Unit = {
-
val index = ack.taskId.index
if (ack.sessionId == sessionId) {
if (ack.actualReceivedNum == ack.seq) {
- if ((ack.seq - processingWatermarkSince(index)).toShort >= 0) {
- outputWatermark(index) = processingWatermark(index)
- processingWatermarkSince(index) = messageCount(index)
+ if (ack.watermark > outputWatermark(index)) {
+ outputWatermark(index) = ack.watermark
}
pendingMessageCount(ack.taskId.index) = (messageCount(ack.taskId.index) - ack.seq).toShort
@@ -209,7 +202,8 @@ class Subscription(
// to throttle the number of unacked AckRequest
incrementMessageCount(partition, ackOnceEveryMessageCount)
val targetTask = TaskId(processorId, partition)
- val ackRequest = AckRequest(taskId, messageCount(partition), sessionId)
+ val processingWatermark = publisher.getProcessingWatermark.toEpochMilli
+ val ackRequest = AckRequest(taskId, messageCount(partition), sessionId, processingWatermark)
publisher.transport(ackRequest, targetTask)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index b43457e..9d9778e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -392,7 +392,7 @@ object TaskActor {
null
} else {
receivedMsgCount.put(sessionId, 0)
- Ack(task_id, 0, 0, sessionId)
+ Ack(task_id, 0, 0, sessionId, Watermark.MIN.toEpochMilli)
}
}
@@ -402,7 +402,8 @@ object TaskActor {
// Increments more count for each AckRequest
// to throttle the number of unacked AckRequest
receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + incrementCount).toShort)
- Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId)
+ Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), ackRequest.sessionId,
+ ackRequest.watermark)
} else {
LOG.error(s"get unknown AckRequest $ackRequest from ${sender.toString()}")
null
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
index 4ba9315..ffa3134 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -30,7 +30,7 @@ case class InitialAckRequest(taskId: TaskId, sessionId: Int)
Here the sessionId filed is used to distinguish messages
between different replays after the application restart
*/
-case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int)
+case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int, watermark: Long)
/**
* Ack back to sender task actor.
@@ -38,7 +38,8 @@ case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int)
* @param seq The seq field represents the expected number of received messages and the
* actualReceivedNum field means the actual received number since start.
*/
-case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int)
+case class Ack(
+ taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId: Int, watermark: Long)
sealed trait ClockEvent
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
index f6f6af2..1ab2358 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/MessageSerializerSpec.scala
@@ -17,9 +17,9 @@
*/
package org.apache.gearpump.streaming
+import org.apache.gearpump.streaming.source.Watermark
import org.jboss.netty.buffer.{ChannelBufferOutputStream, ChannelBuffers}
import org.scalatest.{Matchers, WordSpec}
-
import org.apache.gearpump.streaming.task._
import org.apache.gearpump.transport.netty.WrappedChannelBuffer
@@ -55,7 +55,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
"AckRequestSerializer" should {
"serialize and deserialize AckRequest properly" in {
val serializer = new AckRequestSerializer
- val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024)
+ val ackRequest = AckRequest(TaskId(1, 2), 1000, 1024, Watermark.MAX.toEpochMilli)
assert(testSerializer(ackRequest, serializer).equals(ackRequest))
}
}
@@ -71,7 +71,7 @@ class MessageSerializerSpec extends WordSpec with Matchers {
"AckSerializer" should {
"serialize and deserialize Ack properly" in {
val serializer = new AckSerializer
- val ack = Ack(TaskId(1, 2), 1024, 1023, 1799)
+ val ack = Ack(TaskId(1, 2), 1024, 1023, 1799, Watermark.MAX.toEpochMilli)
assert(testSerializer(ack, serializer).equals(ack))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index b05befa..8079928 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -70,6 +70,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
val (subscription, sender) = prepare
val msg1 = Message("1", timestamp = Instant.ofEpochMilli(70))
when(sender.getProcessingWatermark).thenReturn(msg1.timestamp)
+ // Send first message to Task(1, 1)
subscription.sendMessage(msg1)
verify(sender, times(1)).transport(msg1, TaskId(1, 1))
@@ -77,6 +78,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50))
when(sender.getProcessingWatermark).thenReturn(msg2.timestamp)
+ // Send first message to Task(1, 0)
subscription.sendMessage(msg2)
verify(sender, times(1)).transport(msg2, TaskId(1, 0))
@@ -85,31 +87,39 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
val initialMinClock = subscription.watermark
// Acks initial AckRequest(0)
- subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session))
- subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session))
+ subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session, Watermark.MIN.toEpochMilli))
+ subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session, Watermark.MIN.toEpochMilli))
- // Sends 100 messages
- 100 until 200 foreach { clock =>
- when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(clock),
- Instant.ofEpochMilli(clock))
+ // Sends 98 more messages to each downstream task
+ 100 until 198 foreach { clock =>
subscription.sendMessage(Message("1", clock))
subscription.sendMessage(Message("2", clock))
}
- assert(subscription.watermark == 50)
+ // Triger sending AckRequest
+ val inOrders = org.mockito.Mockito.inOrder(sender, sender)
- subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session))
- subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session))
+ val msg3 = Message("1", Instant.ofEpochMilli(200))
+ val expectedAckRequest = AckRequest(taskId, 200, session, 200)
+ when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(200))
+ subscription.sendMessage(msg3)
+ inOrders.verify(sender).transport(msg3, TaskId(1, 1))
+ inOrders.verify(sender).transport(expectedAckRequest, TaskId(1, 1))
- // Ack received, minClock changed
- assert(subscription.watermark > initialMinClock)
+ val msg4 = Message("2", Instant.ofEpochMilli(200))
+ val expectedAckRequest2 = AckRequest(taskId, 200, session, 220)
+ when(sender.getProcessingWatermark).thenReturn(Instant.ofEpochMilli(220))
+ subscription.sendMessage(msg4)
+ inOrders.verify(sender).transport(msg4, TaskId(1, 0))
+ inOrders.verify(sender).transport(expectedAckRequest2, TaskId(1, 0))
+
+ assert(subscription.watermark == initialMinClock)
- // Expects to receive two ackRequest for two downstream tasks
- val ackRequestForTask0 = AckRequest(taskId, 200, session)
- verify(sender, times(1)).transport(ackRequestForTask0, TaskId(1, 0))
+ subscription.receiveAck(Ack(TaskId(1, 1), 200, 200, session, 200))
+ subscription.receiveAck(Ack(TaskId(1, 0), 200, 200, session, 220))
- val ackRequestForTask1 = AckRequest(taskId, 200, session)
- verify(sender, times(1)).transport(ackRequestForTask1, TaskId(1, 1))
+ // Ack received, minClock changed
+ assert(subscription.watermark == 200)
}
it should "disallow more message sending if there is no ack back" in {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cb6aced5/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
index 8deee78..3906b36 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
@@ -80,7 +80,7 @@ class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach with
testActor ! StartTask(taskId1)
implicit val system = getActorSystem
- val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId)
+ val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId, 1024L)
EventFilter[MsgLostException](occurrences = 1) intercept {
testActor ! ack
}