You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/01/23 00:39:11 UTC

samza git commit: SAMZA-2022: Integrate startpoints with SystemConsumers

Repository: samza
Updated Branches:
  refs/heads/master 18ebfce33 -> 4a08a23ed


SAMZA-2022: Integrate startpoints with SystemConsumers

This PR integrates the `Startpoint` with the `SystemConsumers` multiplexer class. The general logic is if there is a `Startpoint` for an SSP in a task and the `SystemConsumer` implements the `StartpointVisitor` interface, it will apply the visitor method via `Startpoint#apply(SystemStreamPartition, StartpointVisitor)`. Otherwise, the `SystemConsumer#register(SystemStreamPartition, checkpointOffset:String)` method is called as it currently does.

shanthoosh cameronlee314 prateekm - Please take a look... after the break is fine. If I don't respond to comments within 24-48 hours during my leave, please feel free to hijack this PR into another PR.

Author: Daniel Nishimura <dn...@linkedin.com>

Reviewers: Shanthoosh Venkataraman <sp...@usc.edu>,  Jake Maes<jm...@linkedin.com>, Cameron L<ca...@linkedin.com>

Closes #869 from dnishimura/samza-2022-startpoint-systemconsumers-integ


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4a08a23e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4a08a23e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4a08a23e

Branch: refs/heads/master
Commit: 4a08a23ede22c55fb6770e9cff372747a0fb283c
Parents: 18ebfce
Author: Daniel Nishimura <dn...@linkedin.com>
Authored: Tue Jan 22 16:38:40 2019 -0800
Committer: Shanthoosh Venkataraman <sp...@usc.edu>
Committed: Tue Jan 22 16:38:40 2019 -0800

----------------------------------------------------------------------
 .../apache/samza/container/TaskInstance.scala   |  4 +-
 .../apache/samza/system/SystemConsumers.scala   | 18 ++++-
 .../org/apache/samza/task/TestAsyncRunLoop.java |  1 +
 .../samza/system/TestSystemConsumers.scala      | 78 +++++++++++++++++---
 4 files changed, 88 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4a08a23e/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index ef89581..ed16e2d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -163,10 +163,10 @@ class TaskInstance(
 
   def registerConsumers {
     debug("Registering consumers for taskName: %s" format taskName)
-
     systemStreamPartitions.foreach(systemStreamPartition => {
       val startingOffset = getStartingOffset(systemStreamPartition)
-      consumerMultiplexer.register(systemStreamPartition, startingOffset)
+      val startpoint = offsetManager.getStartpoint(taskName, systemStreamPartition).getOrElse(null)
+      consumerMultiplexer.register(systemStreamPartition, startingOffset, startpoint)
       metrics.addOffsetGauge(systemStreamPartition, () =>
         if (sideInputSSPs.contains(systemStreamPartition)) {
           sideInputStorageManager.getLastProcessedOffset(systemStreamPartition)

http://git-wip-us.apache.org/repos/asf/samza/blob/4a08a23e/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index c4fc095..2c9a3ea 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.util.{Logging, TimerUtil}
+import org.apache.samza.startpoint.{Startpoint, StartpointVisitor}
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.SamzaException
 import java.util.ArrayDeque
@@ -34,6 +35,7 @@ import java.util.HashMap
 import java.util.Queue
 import java.util.Set
 
+
 object SystemConsumers {
   val DEFAULT_POLL_INTERVAL_MS = 50
   val DEFAULT_NO_NEW_MESSAGES_TIMEOUT = 10
@@ -181,7 +183,7 @@ class SystemConsumers (
   }
 
 
-  def register(systemStreamPartition: SystemStreamPartition, offset: String) {
+  def register(systemStreamPartition: SystemStreamPartition, offset: String, startpoint: Startpoint) {
     debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
 
     if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(offset)) {
@@ -192,10 +194,22 @@ class SystemConsumers (
 
     metrics.registerSystemStreamPartition(systemStreamPartition)
     unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]())
+
+    // Note regarding Startpoints and MessageChooser:
+    // Even if there is a startpoint for this SSP, passing in the checkpoint offset should not have any side-effects.
+    // Basically, the offset in the chooser is used in the special scenario where an SSP is both a broadcast and bootstrap stream
+    // and needs to decide what's the lowest starting offset for an SSP that spans across multiple tasks so it knows
+    // to keep the highest priority on the SSP starting from the lowest starting offset until the SSP is fully
+    // bootstrapped to the UPCOMING offset. The offset here is ignored otherwise.
     chooser.register(systemStreamPartition, offset)
 
     try {
-      consumers(systemStreamPartition.getSystem).register(systemStreamPartition, offset)
+      val consumer = consumers(systemStreamPartition.getSystem)
+      if (startpoint != null && consumer.isInstanceOf[StartpointVisitor]) {
+        startpoint.apply(systemStreamPartition, consumer.asInstanceOf[StartpointVisitor])
+      } else {
+        consumer.register(systemStreamPartition, offset)
+      }
     } catch {
       case e: NoSuchElementException => throw new SystemConsumersException("can't register " + systemStreamPartition.getSystem + "'s consumer.", e)
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/4a08a23e/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index f8c40dc..9701ed6 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -652,6 +652,7 @@ public class TestAsyncRunLoop {
     when(offsetManager.getLastProcessedOffset(taskName2, ssp2)).thenReturn(Option.apply("0"));
     when(offsetManager.getStartingOffset(taskName1, ssp1)).thenReturn(Option.apply(IncomingMessageEnvelope.END_OF_STREAM_OFFSET));
     when(offsetManager.getStartingOffset(taskName2, ssp2)).thenReturn(Option.apply("1"));
+    when(offsetManager.getStartpoint(anyObject(), anyObject())).thenReturn(Option.empty());
 
     TaskInstance taskInstance1 = createTaskInstance(mockStreamTask1, taskName1, ssp1, offsetManager, consumers);
     TaskInstance taskInstance2 = createTaskInstance(mockStreamTask2, taskName2, ssp2, offsetManager, consumers);

http://git-wip-us.apache.org/repos/asf/samza/blob/4a08a23e/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index eb1a47e..2941236 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -26,6 +26,7 @@ import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.Partition
 import org.apache.samza.serializers._
+import org.apache.samza.startpoint._
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.system.chooser.DefaultChooser
 import org.apache.samza.system.chooser.MockMessageChooser
@@ -48,8 +49,8 @@ class TestSystemConsumers {
                                         SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
                                         SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
 
-    consumers.register(systemStreamPartition0, "0")
-    consumers.register(systemStreamPartition1, "1234")
+    consumers.register(systemStreamPartition0, "0", null)
+    consumers.register(systemStreamPartition1, "1234", null)
     consumers.start
 
     // Tell the consumer to respond with 1000 messages for SSP0, and no
@@ -110,7 +111,7 @@ class TestSystemConsumers {
                                         SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
                                         SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
 
-    consumers.register(systemStreamPartition, "0")
+    consumers.register(systemStreamPartition, "0", null)
     consumers.start
 
     // Start should trigger a poll to the consumer.
@@ -173,7 +174,7 @@ class TestSystemConsumers {
       def register(systemStreamPartition: SystemStreamPartition, offset: String) = chooserRegistered += systemStreamPartition -> offset
     }, consumer, null)
 
-    consumers.register(systemStreamPartition, "0")
+    consumers.register(systemStreamPartition, "0", null)
     consumers.start
     consumers.stop
 
@@ -215,7 +216,7 @@ class TestSystemConsumers {
     // it should throw a SystemConsumersException because system2 does not have a consumer
     var caughtRightException = false
     try {
-      consumers.register(systemStreamPartition2, "0")
+      consumers.register(systemStreamPartition2, "0", null)
     } catch {
       case e: SystemConsumersException => caughtRightException = true
       case _: Throwable => caughtRightException = false
@@ -234,7 +235,7 @@ class TestSystemConsumers {
 
     // throw exceptions when the deserialization has error
     val consumers = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = false)
-    consumers.register(systemStreamPartition, "0")
+    consumers.register(systemStreamPartition, "0", null)
     consumer(system).putBytesMessage
     consumer(system).putStringMessage
     consumers.start
@@ -251,7 +252,7 @@ class TestSystemConsumers {
 
     // it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true
     val consumers2 = new SystemConsumers(msgChooser, consumer, serdeManager, dropDeserializationError = true)
-    consumers2.register(systemStreamPartition, "0")
+    consumers2.register(systemStreamPartition, "0", null)
     consumer(system).putBytesMessage
     consumer(system).putStringMessage
     consumer(system).putBytesMessage
@@ -303,8 +304,8 @@ class TestSystemConsumers {
       SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
       SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
 
-    consumers.register(systemStreamPartition1, "0")
-    consumers.register(systemStreamPartition2, "0")
+    consumers.register(systemStreamPartition1, "0", null)
+    consumers.register(systemStreamPartition2, "0", null)
     consumers.start
 
     // Start should trigger a poll to the consumer.
@@ -339,6 +340,41 @@ class TestSystemConsumers {
     assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
   }
 
+  @Test
+  def testSystemConsumersAndStartpointVisitor {
+    val system = "test-system"
+    val stream = "some-stream"
+    val systemStreamPartition1 = new SystemStreamPartition(system, stream, new Partition(1))
+    val systemStreamPartition2 = new SystemStreamPartition(system, stream, new Partition(2))
+
+    val consumer = new TestStartpointConsumer
+    val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
+      new SerdeManager, new SystemConsumersMetrics,
+      SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
+      SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
+      SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
+
+    consumers.register(systemStreamPartition1, "0", null)
+    assertNull(consumer.getVisitedStartpoint(systemStreamPartition1))
+
+    val startpointSpecific = new StartpointSpecific("1")
+    consumers.register(systemStreamPartition2, "0", startpointSpecific)
+    assertEquals(startpointSpecific, consumer.getVisitedStartpoint(systemStreamPartition2))
+
+    val startpointTimestamp = new StartpointTimestamp(123456L)
+    consumers.register(systemStreamPartition2, "0", startpointTimestamp)
+    assertEquals(startpointTimestamp, consumer.getVisitedStartpoint(systemStreamPartition2))
+
+    val startpointOldest = new StartpointOldest
+    consumers.register(systemStreamPartition2, "0", startpointOldest)
+    assertEquals(startpointOldest, consumer.getVisitedStartpoint(systemStreamPartition2))
+
+    val startpointUpcoming = new StartpointUpcoming
+    consumers.register(systemStreamPartition2, "0", startpointUpcoming)
+    assertEquals(startpointUpcoming, consumer.getVisitedStartpoint(systemStreamPartition2))
+
+  }
+
   /**
    * A simple MockSystemConsumer that keeps track of what was polled, and lets
    * you define how many envelopes to return in the poll response. You can
@@ -384,6 +420,30 @@ class TestSystemConsumers {
     def stop {}
     def register { super.register(systemStreamPartition, "0") }
   }
+
+  private class TestStartpointConsumer extends SerializingConsumer with StartpointVisitor {
+    var startpoints: Map[SystemStreamPartition, Startpoint] = Map()
+
+    override def visit(systemStreamPartition: SystemStreamPartition, startpointSpecific: StartpointSpecific) {
+      startpoints += systemStreamPartition -> startpointSpecific
+    }
+
+    override def visit(systemStreamPartition: SystemStreamPartition, startpointTimestamp: StartpointTimestamp) {
+      startpoints += systemStreamPartition -> startpointTimestamp
+    }
+
+    override def visit(systemStreamPartition: SystemStreamPartition, startpointUpcoming: StartpointUpcoming) {
+      startpoints += systemStreamPartition -> startpointUpcoming
+    }
+
+    override def visit(systemStreamPartition: SystemStreamPartition, startpointOldest: StartpointOldest) {
+      startpoints += systemStreamPartition -> startpointOldest
+    }
+
+    def getVisitedStartpoint(systemStreamPartition: SystemStreamPartition) : Startpoint = {
+      startpoints.getOrElse(systemStreamPartition, null)
+    }
+  }
 }
 
 object TestSystemConsumers {