You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/11 03:17:51 UTC

[samza] branch master updated: SAMZA-2189: Integrate startpoint resolution workflow with SamzaContainer startup sequence. (#1026)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new a73414a  SAMZA-2189: Integrate startpoint resolution workflow with SamzaContainer startup sequence. (#1026)
a73414a is described below

commit a73414a03ffc9eb713f53500a5520309e562cb36
Author: shanthoosh <sp...@usc.edu>
AuthorDate: Fri May 10 20:17:46 2019 -0700

    SAMZA-2189: Integrate startpoint resolution workflow with SamzaContainer startup sequence. (#1026)
    
    * Resolve startpoint to offset before registering with SystemConsumer.
    
    * Add unit tests.
    
    * Address review comments.
---
 .../org/apache/samza/container/TaskInstance.scala  |  41 ++++++--
 .../samza/storage/ContainerStorageManager.java     |   2 +-
 .../org/apache/samza/system/SystemConsumers.scala  |   9 +-
 .../apache/samza/container/TestTaskInstance.scala  | 115 ++++++++++++++++++++-
 .../apache/samza/system/TestSystemConsumers.scala  |  21 ++--
 5 files changed, 159 insertions(+), 29 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index fa17f24..ae6db22 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -23,14 +23,15 @@ package org.apache.samza.container
 import java.util.{Objects, Optional}
 import java.util.concurrent.ScheduledExecutorService
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.context._
 import org.apache.samza.job.model.{JobModel, TaskModel}
-import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback}
+import org.apache.samza.startpoint.Startpoint
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system._
@@ -141,15 +142,39 @@ class TaskInstance(
     collector.register
   }
 
-  def registerConsumers {
+  /**
+    * Computes the starting offset for the partitions assigned to the task and registers them with the underlying {@see SystemConsumers}.
+    *
+    * Starting offset for a partition of the task is computed in the following manner:
+    *
+    * 1. If a startpoint exists for a task, system stream partition and it resolves to a offset, then the resolved offset is used as the starting offset.
+    * 2. Else, the checkpointed offset for the system stream partition is used as the starting offset.
+    */
+  def registerConsumers() {
     debug("Registering consumers for taskName: %s" format taskName)
     systemStreamPartitions.foreach(systemStreamPartition => {
-      val startingOffset = getStartingOffset(systemStreamPartition)
-      val startpoint = offsetManager.getStartpoint(taskName, systemStreamPartition).getOrElse(null)
-      consumerMultiplexer.register(systemStreamPartition, startingOffset, startpoint)
-      metrics.addOffsetGauge(systemStreamPartition, () =>
-          offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).orNull
-        )
+      var startingOffset: String = getStartingOffset(systemStreamPartition)
+      val startpointOption: Option[Startpoint] = offsetManager.getStartpoint(taskName, systemStreamPartition)
+      startpointOption match {
+        case Some(startpoint) => {
+          try {
+            val systemAdmin: SystemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
+            val resolvedOffset: String = systemAdmin.resolveStartpointToOffset(systemStreamPartition, startpoint)
+            if (StringUtils.isNotBlank(resolvedOffset)) {
+              startingOffset = resolvedOffset
+              info("Resolved the startpoint: %s of system stream partition: %s to offset: %s." format(startpoint,  systemStreamPartition, startingOffset))
+            }
+          } catch {
+            case e: Exception =>
+              error("Exception occurred when resolving startpoint: %s of system stream partition: %s to offset." format(startpoint, systemStreamPartition), e)
+          }
+        }
+        case None => {
+          debug("Startpoint does not exist for system stream partition: %s. Using the checkpointed offset: %s" format(systemStreamPartition, startingOffset))
+        }
+      }
+      consumerMultiplexer.register(systemStreamPartition, startingOffset)
+      metrics.addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).orNull)
     })
   }
 
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 0d0e0d8..85e10ea 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -667,7 +667,7 @@ public class ContainerStorageManager {
       }
 
       // register startingOffset with the sysConsumer and register a metric for it
-      sideInputSystemConsumers.register(ssp, startingOffset, null);
+      sideInputSystemConsumers.register(ssp, startingOffset);
       taskInstanceMetrics.get(sideInputStorageManagers.get(ssp).getTaskName()).addOffsetGauge(
           ssp, ScalaJavaUtil.toScalaFunction(() -> sideInputStorageManagers.get(ssp).getLastProcessedOffset(ssp)));
 
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 05f423d..563a104 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -31,7 +31,6 @@ import java.util.Set
 import scala.collection.JavaConverters._
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.util.{Logging, TimerUtil}
-import org.apache.samza.startpoint.Startpoint
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.SamzaException
 
@@ -197,7 +196,7 @@ class SystemConsumers (
   }
 
 
-  def register(systemStreamPartition: SystemStreamPartition, offset: String, startpoint: Startpoint) {
+  def register(systemStreamPartition: SystemStreamPartition, offset: String) {
     debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
 
     if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(offset)) {
@@ -209,12 +208,6 @@ class SystemConsumers (
     metrics.registerSystemStreamPartition(systemStreamPartition)
     unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]())
 
-    // Note regarding Startpoints and MessageChooser:
-    // Even if there is a startpoint for this SSP, passing in the checkpoint offset should not have any side-effects.
-    // Basically, the offset in the chooser is used in the special scenario where an SSP is both a broadcast and bootstrap stream
-    // and needs to decide what's the lowest starting offset for an SSP that spans across multiple tasks so it knows
-    // to keep the highest priority on the SSP starting from the lowest starting offset until the SSP is fully
-    // bootstrapped to the UPCOMING offset. The offset here is ignored otherwise.
     chooser.register(systemStreamPartition, offset)
 
     try {
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 0386386..06a091c 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -19,7 +19,6 @@
 
 package org.apache.samza.container
 
-
 import java.util.Collections
 
 import org.apache.samza.Partition
@@ -27,6 +26,7 @@ import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
 import org.apache.samza.context.{TaskContext => _, _}
 import org.apache.samza.job.model.TaskModel
 import org.apache.samza.metrics.Counter
+import org.apache.samza.startpoint.Startpoint
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system.{IncomingMessageEnvelope, StreamMetadataCache, SystemAdmin, SystemConsumers, SystemStream, SystemStreamMetadata, _}
 import org.apache.samza.table.TableManager
@@ -295,6 +295,119 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     assertTrue(taskInstance.ssp2CaughtupMapping(ssp))
   }
 
+  @Test
+  def testRegisterConsumersShouldUseTheCheckpointedOffsetWhenStartpointDoesNotExist(): Unit = {
+    val offsetManagerMock = mock[OffsetManager]
+    val cacheMock = mock[StreamMetadataCache]
+    val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+    val startingOffset = "42"
+
+    when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
+    when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(None)
+    doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+    doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
+
+    val taskInstance = new TaskInstance(this.task,
+      this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
+      offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
+      systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
+      jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
+      applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
+
+    taskInstance.registerConsumers
+
+    verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+    verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
+    verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
+  }
+
+  @Test
+  def testRegisterConsumersShouldUseStartpointtWhenItExists(): Unit = {
+    val offsetManagerMock = mock[OffsetManager]
+    val cacheMock = mock[StreamMetadataCache]
+    val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+    val startingOffset = "52"
+    val resolvedOffset = "62"
+    val mockStartpoint: Startpoint = mock[Startpoint]
+
+    when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
+    when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+    when(systemAdmin.resolveStartpointToOffset(systemStreamPartition, mockStartpoint)).thenReturn(resolvedOffset)
+    doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, resolvedOffset)
+    doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
+
+    val taskInstance = new TaskInstance(this.task,
+      this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
+      offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
+      systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
+      jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
+      applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
+
+    taskInstance.registerConsumers
+
+    verify(this.consumerMultiplexer).register(systemStreamPartition, resolvedOffset)
+    verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
+    verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
+  }
+
+  @Test
+  def testRegisterConsumersShouldUseCheckpointedOffsetWhenStartpointResolutionThrowsUp(): Unit = {
+    val offsetManagerMock = mock[OffsetManager]
+    val cacheMock = mock[StreamMetadataCache]
+    val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+    val startingOffset = "72"
+    val mockStartpoint: Startpoint = mock[Startpoint]
+
+    when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
+    when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+    doThrow(new RuntimeException("Random exception")).when(systemAdmin).resolveStartpointToOffset(systemStreamPartition, mockStartpoint)
+    doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+    doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
+
+    val taskInstance = new TaskInstance(this.task,
+      this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
+      offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
+      systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
+      jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
+      applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
+
+    taskInstance.registerConsumers
+
+    verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+    verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
+    verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
+  }
+
+  @Test
+  def testRegisterConsumersShouldUseCheckpointedOffsetWhenStartpointResolutionReturnsNull(): Unit = {
+    val offsetManagerMock = mock[OffsetManager]
+    val cacheMock = mock[StreamMetadataCache]
+    val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+    val startingOffset = "72"
+    val mockStartpoint: Startpoint = mock[Startpoint]
+
+    when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
+    when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+    doReturn(null).when(systemAdmin).resolveStartpointToOffset(systemStreamPartition, mockStartpoint)
+    doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+    doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
+
+    val taskInstance = new TaskInstance(this.task,
+      this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
+      offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
+      systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
+      jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
+      applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
+
+    taskInstance.registerConsumers
+
+    verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+    verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
+    verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
+  }
 
   private def setupTaskInstance(
     applicationTaskContextFactory: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]): Unit = {
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 875c4a9..037fbb3 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -53,8 +53,8 @@ class TestSystemConsumers {
                                         SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
                                         SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
 
-    consumers.register(systemStreamPartition0, "0", null)
-    consumers.register(systemStreamPartition1, "1234", null)
+    consumers.register(systemStreamPartition0, "0")
+    consumers.register(systemStreamPartition1, "1234")
     consumers.start
 
     // Tell the consumer to respond with 1000 messages for SSP0, and no
@@ -118,7 +118,7 @@ class TestSystemConsumers {
                                         SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
                                         SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
 
-    consumers.register(systemStreamPartition, "0", null)
+    consumers.register(systemStreamPartition, "0")
     consumers.start
 
     // Start should trigger a poll to the consumer.
@@ -184,7 +184,7 @@ class TestSystemConsumers {
       def register(systemStreamPartition: SystemStreamPartition, offset: String) = chooserRegistered += systemStreamPartition -> offset
     }, consumer, systemAdmins)
 
-    consumers.register(systemStreamPartition, "0", null)
+    consumers.register(systemStreamPartition, "0")
     consumers.start
     consumers.stop
 
@@ -226,7 +226,7 @@ class TestSystemConsumers {
     // it should throw a SystemConsumersException because system2 does not have a consumer
     var caughtRightException = false
     try {
-      consumers.register(systemStreamPartition2, "0", null)
+      consumers.register(systemStreamPartition2, "0")
     } catch {
       case e: SystemConsumersException => caughtRightException = true
       case _: Throwable => caughtRightException = false
@@ -247,7 +247,7 @@ class TestSystemConsumers {
 
     // throw exceptions when the deserialization has error
     val consumers = new SystemConsumers(msgChooser, consumer, systemAdmins, serdeManager, dropDeserializationError = false)
-    consumers.register(systemStreamPartition, "0", null)
+    consumers.register(systemStreamPartition, "0")
     consumers.start
     consumer(system).putStringMessage
     consumer(system).putBytesMessage
@@ -264,7 +264,7 @@ class TestSystemConsumers {
 
     // it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true
     val consumers2 = new SystemConsumers(msgChooser, consumer, systemAdmins, serdeManager, dropDeserializationError = true)
-    consumers2.register(systemStreamPartition, "0", null)
+    consumers2.register(systemStreamPartition, "0")
     consumers2.start
     consumer(system).putBytesMessage
     consumer(system).putStringMessage
@@ -318,8 +318,8 @@ class TestSystemConsumers {
       SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
       SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
 
-    consumers.register(systemStreamPartition1, "0", null)
-    consumers.register(systemStreamPartition2, "0", null)
+    consumers.register(systemStreamPartition1, "0")
+    consumers.register(systemStreamPartition2, "0")
     consumers.start
 
     // Start should trigger a poll to the consumer.
@@ -362,7 +362,6 @@ class TestSystemConsumers {
     val systemStreamPartition2 = new SystemStreamPartition(system, stream, new Partition(2))
 
     val consumer = Mockito.mock(classOf[SystemConsumer])
-    val startpoint = Mockito.mock(classOf[Startpoint])
     val systemAdmins = Mockito.mock(classOf[SystemAdmins])
     Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin]))
 
@@ -372,7 +371,7 @@ class TestSystemConsumers {
       SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
       SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
 
-    consumers.register(systemStreamPartition1, "0", startpoint)
+    consumers.register(systemStreamPartition1, "0")
   }
 
   /**