You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/01/23 00:39:11 UTC
samza git commit: SAMZA-2022: Integrate startpoints with
SystemConsumers
Repository: samza
Updated Branches:
refs/heads/master 18ebfce33 -> 4a08a23ed
SAMZA-2022: Integrate startpoints with SystemConsumers
This PR integrates the `Startpoint` with the `SystemConsumers` multiplexer class. The general logic is if there is a `Startpoint` for an SSP in a task and the `SystemConsumer` implements the `StartpointVisitor` interface, it will apply the visitor method via `Startpoint#apply(SystemStreamPartition, StartpointVisitor)`. Otherwise, the `SystemConsumer#register(SystemStreamPartition, checkpointOffset:String)` method is called as it currently does.
shanthoosh cameronlee314 prateekm - Please take a look... after the break is fine. If I don't respond to comments within 24-48 hours during my leave, please feel free to hijack this PR into another PR.
Author: Daniel Nishimura <dn...@linkedin.com>
Reviewers: Shanthoosh Venkataraman <sp...@usc.edu>, Jake Maes<jm...@linkedin.com>, Cameron L<ca...@linkedin.com>
Closes #869 from dnishimura/samza-2022-startpoint-systemconsumers-integ
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4a08a23e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4a08a23e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4a08a23e
Branch: refs/heads/master
Commit: 4a08a23ede22c55fb6770e9cff372747a0fb283c
Parents: 18ebfce
Author: Daniel Nishimura <dn...@linkedin.com>
Authored: Tue Jan 22 16:38:40 2019 -0800
Committer: Shanthoosh Venkataraman <sp...@usc.edu>
Committed: Tue Jan 22 16:38:40 2019 -0800
----------------------------------------------------------------------
.../apache/samza/container/TaskInstance.scala | 4 +-
.../apache/samza/system/SystemConsumers.scala | 18 ++++-
.../org/apache/samza/task/TestAsyncRunLoop.java | 1 +
.../samza/system/TestSystemConsumers.scala | 78 +++++++++++++++++---
4 files changed, 88 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4a08a23e/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index ef89581..ed16e2d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -163,10 +163,10 @@ class TaskInstance(
def registerConsumers {
debug("Registering consumers for taskName: %s" format taskName)
-
systemStreamPartitions.foreach(systemStreamPartition => {
val startingOffset = getStartingOffset(systemStreamPartition)
- consumerMultiplexer.register(systemStreamPartition, startingOffset)
+ val startpoint = offsetManager.getStartpoint(taskName, systemStreamPartition).getOrElse(null)
+ consumerMultiplexer.register(systemStreamPartition, startingOffset, startpoint)
metrics.addOffsetGauge(systemStreamPartition, () =>
if (sideInputSSPs.contains(systemStreamPartition)) {
sideInputStorageManager.getLastProcessedOffset(systemStreamPartition)
http://git-wip-us.apache.org/repos/asf/samza/blob/4a08a23e/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index c4fc095..2c9a3ea 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.util.{Logging, TimerUtil}
+import org.apache.samza.startpoint.{Startpoint, StartpointVisitor}
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.SamzaException
import java.util.ArrayDeque
@@ -34,6 +35,7 @@ import java.util.HashMap
import java.util.Queue
import java.util.Set
+
object SystemConsumers {
val DEFAULT_POLL_INTERVAL_MS = 50
val DEFAULT_NO_NEW_MESSAGES_TIMEOUT = 10
@@ -181,7 +183,7 @@ class SystemConsumers (
}
- def register(systemStreamPartition: SystemStreamPartition, offset: String) {
+ def register(systemStreamPartition: SystemStreamPartition, offset: String, startpoint: Startpoint) {
debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(offset)) {
@@ -192,10 +194,22 @@ class SystemConsumers (
metrics.registerSystemStreamPartition(systemStreamPartition)
unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]())
+
+ // Note regarding Startpoints and MessageChooser:
+ // Even if there is a startpoint for this SSP, passing in the checkpoint offset should not have any side-effects.
+ // Basically, the offset in the chooser is used in the special scenario where an SSP is both a broadcast and bootstrap stream
+ // and needs to decide what's the lowest starting offset for an SSP that spans across multiple tasks so it knows
+ // to keep the highest priority on the SSP starting from the lowest starting offset until the SSP is fully
+ // bootstrapped to the UPCOMING offset. The offset here is ignored otherwise.
chooser.register(systemStreamPartition, offset)
try {
- consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset)
+ val consumer = consumers(systemStreamPartition.getSystem)
+ if (startpoint != null && consumer.isInstanceOf[StartpointVisitor]) {
+ startpoint.apply(systemStreamPartition, consumer.asInstanceOf[StartpointVisitor])
+ } else {
+ consumer.register(systemStreamPartition, offset)
+ }
} catch {
case e: NoSuchElementException => throw new SystemConsumersException("can't register " + systemStreamPartition.getSystem + "'s consumer.", e)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4a08a23e/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index f8c40dc..9701ed6 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -652,6 +652,7 @@ public class TestAsyncRunLoop {
when(offsetManager.getLastProcessedOffset(taskName2, ssp2)).thenReturn(Option.apply("0"));
when(offsetManager.getStartingOffset(taskName1, ssp1)).thenReturn(Option.apply(IncomingMessageEnvelope.END_OF_STREAM_OFFSET));
when(offsetManager.getStartingOffset(taskName2, ssp2)).thenReturn(Option.apply("1"));
+ when(offsetManager.getStartpoint(anyObject(), anyObject())).thenReturn(Option.empty());
TaskInstance taskInstance1 = createTaskInstance(mockStreamTask1, taskName1, ssp1, offsetManager, consumers);
TaskInstance taskInstance2 = createTaskInstance(mockStreamTask2, taskName2, ssp2, offsetManager, consumers);
http://git-wip-us.apache.org/repos/asf/samza/blob/4a08a23e/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index eb1a47e..2941236 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -26,6 +26,7 @@ import org.junit.Assert._
import org.junit.Test
import org.apache.samza.Partition
import org.apache.samza.serializers._
+import org.apache.samza.startpoint._
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.system.chooser.DefaultChooser
import org.apache.samza.system.chooser.MockMessageChooser
@@ -48,8 +49,8 @@ class TestSystemConsumers {
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
- consumers.register(systemStreamPartition0, "0")
- consumers.register(systemStreamPartition1, "1234")
+ consumers.register(systemStreamPartition0, "0", null)
+ consumers.register(systemStreamPartition1, "1234", null)
consumers.start
// Tell the consumer to respond with 1000 messages for SSP0, and no
@@ -110,7 +111,7 @@ class TestSystemConsumers {
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
- consumers.register(systemStreamPartition, "0")
+ consumers.register(systemStreamPartition, "0", null)
consumers.start
// Start should trigger a poll to the consumer.
@@ -173,7 +174,7 @@ class TestSystemConsumers {
def register(systemStreamPartition: SystemStreamPartition, offset: String) = chooserRegistered += systemStreamPartition -> offset
}, consumer, null)
- consumers.register(systemStreamPartition, "0")
+ consumers.register(systemStreamPartition, "0", null)
consumers.start
consumers.stop
@@ -215,7 +216,7 @@ class TestSystemConsumers {
// it should throw a SystemConsumersException because system2 does not have a consumer
var caughtRightException = false
try {
- consumers.register(systemStreamPartition2, "0")
+ consumers.register(systemStreamPartition2, "0", null)
} catch {
case e: SystemConsumersException => caughtRightException = true
case _: Throwable => caughtRightException = false
@@ -234,7 +235,7 @@ class TestSystemConsumers {
// throw exceptions when the deserialization has error
val consumers = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = false)
- consumers.register(systemStreamPartition, "0")
+ consumers.register(systemStreamPartition, "0", null)
consumer(system).putBytesMessage
consumer(system).putStringMessage
consumers.start
@@ -251,7 +252,7 @@ class TestSystemConsumers {
// it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true
val consumers2 = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = true)
- consumers2.register(systemStreamPartition, "0")
+ consumers2.register(systemStreamPartition, "0", null)
consumer(system).putBytesMessage
consumer(system).putStringMessage
consumer(system).putBytesMessage
@@ -303,8 +304,8 @@ class TestSystemConsumers {
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
- consumers.register(systemStreamPartition1, "0")
- consumers.register(systemStreamPartition2, "0")
+ consumers.register(systemStreamPartition1, "0", null)
+ consumers.register(systemStreamPartition2, "0", null)
consumers.start
// Start should trigger a poll to the consumer.
@@ -339,6 +340,41 @@ class TestSystemConsumers {
assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
}
+ @Test
+ def testSystemConsumersAndStartpointVisitor {
+ val system = "test-system"
+ val stream = "some-stream"
+ val systemStreamPartition1 = new SystemStreamPartition(system, stream, new Partition(1))
+ val systemStreamPartition2 = new SystemStreamPartition(system, stream, new Partition(2))
+
+ val consumer = new TestStartpointConsumer
+ val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
+ new SerdeManager, new SystemConsumersMetrics,
+ SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
+ SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
+ SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
+
+ consumers.register(systemStreamPartition1, "0", null)
+ assertNull(consumer.getVisitedStartpoint(systemStreamPartition1))
+
+ val startpointSpecific = new StartpointSpecific("1")
+ consumers.register(systemStreamPartition2, "0", startpointSpecific)
+ assertEquals(startpointSpecific, consumer.getVisitedStartpoint(systemStreamPartition2))
+
+ val startpointTimestamp = new StartpointTimestamp(123456L)
+ consumers.register(systemStreamPartition2, "0", startpointTimestamp)
+ assertEquals(startpointTimestamp, consumer.getVisitedStartpoint(systemStreamPartition2))
+
+ val startpointOldest = new StartpointOldest
+ consumers.register(systemStreamPartition2, "0", startpointOldest)
+ assertEquals(startpointOldest, consumer.getVisitedStartpoint(systemStreamPartition2))
+
+ val startpointUpcoming = new StartpointUpcoming
+ consumers.register(systemStreamPartition2, "0", startpointUpcoming)
+ assertEquals(startpointUpcoming, consumer.getVisitedStartpoint(systemStreamPartition2))
+
+ }
+
/**
* A simple MockSystemConsumer that keeps track of what was polled, and lets
* you define how many envelopes to return in the poll response. You can
@@ -384,6 +420,30 @@ class TestSystemConsumers {
def stop {}
def register { super.register(systemStreamPartition, "0") }
}
+
+ private class TestStartpointConsumer extends SerializingConsumer with StartpointVisitor {
+ var startpoints: Map[SystemStreamPartition, Startpoint] = Map()
+
+ override def visit(systemStreamPartition: SystemStreamPartition, startpointSpecific: StartpointSpecific) {
+ startpoints += systemStreamPartition -> startpointSpecific
+ }
+
+ override def visit(systemStreamPartition: SystemStreamPartition, startpointTimestamp: StartpointTimestamp) {
+ startpoints += systemStreamPartition -> startpointTimestamp
+ }
+
+ override def visit(systemStreamPartition: SystemStreamPartition, startpointUpcoming: StartpointUpcoming) {
+ startpoints += systemStreamPartition -> startpointUpcoming
+ }
+
+ override def visit(systemStreamPartition: SystemStreamPartition, startpointOldest: StartpointOldest) {
+ startpoints += systemStreamPartition -> startpointOldest
+ }
+
+ def getVisitedStartpoint(systemStreamPartition: SystemStreamPartition) : Startpoint = {
+ startpoints.getOrElse(systemStreamPartition, null)
+ }
+ }
}
object TestSystemConsumers {