You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/30 17:30:43 UTC

[samza] branch master updated: SAMZA-2220: Startpoints - Fully encapsulate resolution of starting offsets in OffsetManager (#1053)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new fee823b  SAMZA-2220: Startpoints - Fully encapsulate resolution of starting offsets in OffsetManager (#1053)
fee823b is described below

commit fee823bdae90d60094806a0b48668b8abd21bb5f
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Thu May 30 10:30:38 2019 -0700

    SAMZA-2220: Startpoints - Fully encapsulate resolution of starting offsets in OffsetManager (#1053)
    
    * SAMZA-2220: Startpoints - Fully encapsulate resolution of starting offsets in OffsetManager
    
    * Address comment from @rmatharu
    
    * Mention startpoints in OffsetManager javadoc.
---
 .../apache/samza/checkpoint/OffsetManager.scala    |  52 ++++++++--
 .../org/apache/samza/container/TaskInstance.scala  |  21 +---
 .../samza/checkpoint/TestOffsetManager.scala       |  78 +++++++++++++-
 .../apache/samza/container/TestTaskInstance.scala  | 114 ---------------------
 4 files changed, 124 insertions(+), 141 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 90f9668..cdeddb0 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -22,6 +22,7 @@ package org.apache.samza.checkpoint
 import java.util.HashMap
 import java.util.concurrent.ConcurrentHashMap
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.samza.SamzaException
 import org.apache.samza.annotation.InterfaceStability
 import org.apache.samza.config.StreamConfig.Config2Stream
@@ -110,14 +111,15 @@ object OffsetManager extends Logging {
  * OffsetManager does several things:
  *
  * <ul>
- * <li>Loads last checkpointed offset for all input SystemStreamPartitions in a
- * SamzaContainer.</li>
- * <li>Uses last checkpointed offset to figure out the next offset to start
- * reading from for each input SystemStreamPartition in a SamzaContainer</li>
+ * <li>Loads last checkpointed offset and startpoints for all input SystemStreamPartitions in a
+ * SamzaContainer. See SEP-18 for details.</li>
+ * <li>Uses last checkpointed offset or startpoint to figure out the next offset to start
+ * reading from for each input SystemStreamPartition in a SamzaContainer. Startpoints have a higher precedence than
+ * checkpoints.</li>
  * <li>Keep track of the last processed offset for each SystemStreamPartitions
  * in a SamzaContainer.</li>
  * <li>Checkpoints the last processed offset for each SystemStreamPartitions
- * in a SamzaContainer periodically to the CheckpointManager.</li>
+ * in a SamzaContainer periodically to the CheckpointManager and deletes any associated startpoints.</li>
  * </ul>
  *
  * All partitions must be registered before start is called, and start must be
@@ -137,7 +139,7 @@ class OffsetManager(
   val checkpointManager: CheckpointManager = null,
 
   /**
-    * Optional startpoint manager for overrided offsets.
+    * Optional startpoint manager for overridden offsets.
     */
   val startpointManager: StartpointManager = null,
 
@@ -535,9 +537,47 @@ class OffsetManager(
         startpoints
           .foreach(taskMap => taskMap._2
             .foreach(sspMap => info("Loaded startpoint: %s for SSP: %s and task: %s" format (sspMap._2, sspMap._1, taskMap._1))))
+        resolveStartpointsToStartingOffsets
       }
     }
   }
+
+  /**
+    * Overwrite starting offsets with resolved offsets from startpoints
+    */
+  private def resolveStartpointsToStartingOffsets: Unit = {
+    startpoints.foreach {
+      case (taskName, sspToStartpoint) => {
+        var resolvedOffsets: Map[SystemStreamPartition, String] = Map()
+        sspToStartpoint.foreach {
+          case (ssp, startpoint) => {
+            try {
+              val systemAdmin: SystemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem)
+              val resolvedOffset: String = systemAdmin.resolveStartpointToOffset(ssp, startpoint)
+              if (StringUtils.isNotBlank(resolvedOffset)) {
+                resolvedOffsets += ssp -> resolvedOffset
+                info("Resolved the startpoint: %s of system stream partition: %s to offset: %s." format(startpoint, ssp, resolvedOffset))
+              }
+            } catch {
+              case e: Exception => {
+                error("Exception occurred when resolving startpoint: %s of system stream partition: %s to offset." format(startpoint, ssp), e)
+              }
+            }
+          }
+        }
+
+        // copy starting offsets and overwrite with resolved offsets
+        var mergedOffsets: Map[SystemStreamPartition, String] = startingOffsets.getOrElse(taskName, Map())
+        resolvedOffsets.foreach {
+          case (ssp, resolvedOffset) => {
+            mergedOffsets += ssp -> resolvedOffset
+          }
+        }
+        startingOffsets += taskName -> mergedOffsets
+      }
+    }
+  }
+
   /**
    * Use defaultOffsets to get a next offset for every SystemStreamPartition
    * that was registered, but has no offset.
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 884244e..22879fa 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -153,26 +153,7 @@ class TaskInstance(
   def registerConsumers() {
     debug("Registering consumers for taskName: %s" format taskName)
     systemStreamPartitions.foreach(systemStreamPartition => {
-      var startingOffset: String = getStartingOffset(systemStreamPartition)
-      val startpointOption: Option[Startpoint] = offsetManager.getStartpoint(taskName, systemStreamPartition)
-      startpointOption match {
-        case Some(startpoint) => {
-          try {
-            val systemAdmin: SystemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
-            val resolvedOffset: String = systemAdmin.resolveStartpointToOffset(systemStreamPartition, startpoint)
-            if (StringUtils.isNotBlank(resolvedOffset)) {
-              startingOffset = resolvedOffset
-              info("Resolved the startpoint: %s of system stream partition: %s to offset: %s." format(startpoint,  systemStreamPartition, startingOffset))
-            }
-          } catch {
-            case e: Exception =>
-              error("Exception occurred when resolving startpoint: %s of system stream partition: %s to offset." format(startpoint, systemStreamPartition), e)
-          }
-        }
-        case None => {
-          debug("Startpoint does not exist for system stream partition: %s. Using the checkpointed offset: %s" format(systemStreamPartition, startingOffset))
-        }
-      }
+      val startingOffset: String = getStartingOffset(systemStreamPartition)
       consumerMultiplexer.register(systemStreamPartition, startingOffset)
       metrics.addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).orNull)
     })
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 4c53ec2..50c793c 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -22,9 +22,10 @@ package org.apache.samza.checkpoint
 import java.util
 import java.util.function.BiConsumer
 
+import com.google.common.collect.ImmutableMap
 import org.apache.samza.config.MapConfig
 import org.apache.samza.container.TaskName
-import org.apache.samza.startpoint.{StartpointManagerTestUtil, StartpointOldest, StartpointUpcoming}
+import org.apache.samza.startpoint.{Startpoint, StartpointManagerTestUtil, StartpointOldest, StartpointSpecific, StartpointUpcoming}
 import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata}
 import org.apache.samza.system._
 import org.apache.samza.{Partition, SamzaException}
@@ -135,6 +136,81 @@ class TestOffsetManager {
   }
 
   @Test
+  def testGetStartingOffsetWhenResolvedFromStartpoint: Unit = {
+    val taskName1 = new TaskName("c")
+    val taskName2 = new TaskName("d")
+    val systemStream1 = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream1, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream1.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "51", "52")).asJava)
+    val systemStreamMetadata = Map(systemStream1 -> testStreamMetadata)
+    val config = new MapConfig
+    val checkpointManager = getCheckpointManager(systemStreamPartition, taskName1)
+    val startpointManagerUtil = getStartpointManagerUtil()
+    val systemAdmins = mock(classOf[SystemAdmins])
+    val systemAdmin = mock(classOf[SystemAdmin])
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+    val testStartpoint = new StartpointSpecific("23")
+    Mockito.doReturn(testStartpoint.getSpecificOffset).when(systemAdmin).resolveStartpointToOffset(refEq(systemStreamPartition), refEq(testStartpoint))
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+
+    offsetManager.register(taskName1, Set(systemStreamPartition))
+    val startpointManager = startpointManagerUtil.getStartpointManager
+    startpointManager.writeStartpoint(systemStreamPartition, testStartpoint)
+    startpointManager.fanOut(asTaskToSSPMap(taskName1, systemStreamPartition))
+    offsetManager.start
+    assertEquals(testStartpoint.getSpecificOffset, offsetManager.getStartingOffset(taskName1, systemStreamPartition).get)
+  }
+
+  @Test
+  def testGetStartingOffsetWhenResolveStartpointToOffsetIsNull: Unit = {
+    val taskName1 = new TaskName("c")
+    val taskName2 = new TaskName("d")
+    val systemStream1 = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream1, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream1.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "51", "52")).asJava)
+    val systemStreamMetadata = Map(systemStream1 -> testStreamMetadata)
+    val config = new MapConfig
+    val checkpointManager = getCheckpointManager(systemStreamPartition, taskName1)
+    val startpointManagerUtil = getStartpointManagerUtil()
+    val systemAdmins = mock(classOf[SystemAdmins])
+    val systemAdmin = mock(classOf[SystemAdmin])
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+    Mockito.doReturn(null).when(systemAdmin).resolveStartpointToOffset(refEq(systemStreamPartition), refEq(null))
+    Mockito.doReturn(ImmutableMap.of(systemStreamPartition, "46")).when(systemAdmin).getOffsetsAfter(any[util.Map[SystemStreamPartition, String]])
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    offsetManager.register(taskName1, Set(systemStreamPartition))
+    val startpointManager = startpointManagerUtil.getStartpointManager
+    offsetManager.start
+    assertEquals("46", offsetManager.getStartingOffset(taskName1, systemStreamPartition).get)
+  }
+
+  @Test
+  def testGetStartingOffsetWhenResolveStartpointToOffsetThrows: Unit = {
+    val taskName1 = new TaskName("c")
+    val taskName2 = new TaskName("d")
+    val systemStream1 = new SystemStream("test-system", "test-stream")
+    val partition = new Partition(0)
+    val systemStreamPartition = new SystemStreamPartition(systemStream1, partition)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream1.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "51", "52")).asJava)
+    val systemStreamMetadata = Map(systemStream1 -> testStreamMetadata)
+    val config = new MapConfig
+    val checkpointManager = getCheckpointManager(systemStreamPartition, taskName1)
+    val startpointManagerUtil = getStartpointManagerUtil()
+    val systemAdmins = mock(classOf[SystemAdmins])
+    val systemAdmin = mock(classOf[SystemAdmin])
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+    Mockito.doThrow(new RuntimeException("mock startpoint resolution exception")).when(systemAdmin).resolveStartpointToOffset(refEq(systemStreamPartition), refEq(null))
+    Mockito.doReturn(ImmutableMap.of(systemStreamPartition, "46")).when(systemAdmin).getOffsetsAfter(any[util.Map[SystemStreamPartition, String]])
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    offsetManager.register(taskName1, Set(systemStreamPartition))
+    val startpointManager = startpointManagerUtil.getStartpointManager
+    offsetManager.start
+    assertEquals("46", offsetManager.getStartingOffset(taskName1, systemStreamPartition).get)
+  }
+
+  @Test
   def testGetCheckpointedOffsetMetric{
     val taskName = new TaskName("c")
     val systemStream = new SystemStream("test-system", "test-stream")
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 06a091c..921e91b 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -295,120 +295,6 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
     assertTrue(taskInstance.ssp2CaughtupMapping(ssp))
   }
 
-  @Test
-  def testRegisterConsumersShouldUseTheCheckpointedOffsetWhenStartpointDoesNotExist(): Unit = {
-    val offsetManagerMock = mock[OffsetManager]
-    val cacheMock = mock[StreamMetadataCache]
-    val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
-    val startingOffset = "42"
-
-    when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
-    when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(None)
-    doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
-    doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
-
-    val taskInstance = new TaskInstance(this.task,
-      this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
-      offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
-      systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
-      jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
-      applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
-
-    taskInstance.registerConsumers
-
-    verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
-    verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
-    verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
-  }
-
-  @Test
-  def testRegisterConsumersShouldUseStartpointtWhenItExists(): Unit = {
-    val offsetManagerMock = mock[OffsetManager]
-    val cacheMock = mock[StreamMetadataCache]
-    val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
-    val startingOffset = "52"
-    val resolvedOffset = "62"
-    val mockStartpoint: Startpoint = mock[Startpoint]
-
-    when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
-    when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
-    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
-    when(systemAdmin.resolveStartpointToOffset(systemStreamPartition, mockStartpoint)).thenReturn(resolvedOffset)
-    doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, resolvedOffset)
-    doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
-
-    val taskInstance = new TaskInstance(this.task,
-      this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
-      offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
-      systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
-      jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
-      applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
-
-    taskInstance.registerConsumers
-
-    verify(this.consumerMultiplexer).register(systemStreamPartition, resolvedOffset)
-    verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
-    verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
-  }
-
-  @Test
-  def testRegisterConsumersShouldUseCheckpointedOffsetWhenStartpointResolutionThrowsUp(): Unit = {
-    val offsetManagerMock = mock[OffsetManager]
-    val cacheMock = mock[StreamMetadataCache]
-    val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
-    val startingOffset = "72"
-    val mockStartpoint: Startpoint = mock[Startpoint]
-
-    when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
-    when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
-    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
-    doThrow(new RuntimeException("Random exception")).when(systemAdmin).resolveStartpointToOffset(systemStreamPartition, mockStartpoint)
-    doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
-    doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
-
-    val taskInstance = new TaskInstance(this.task,
-      this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
-      offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
-      systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
-      jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
-      applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
-
-    taskInstance.registerConsumers
-
-    verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
-    verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
-    verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
-  }
-
-  @Test
-  def testRegisterConsumersShouldUseCheckpointedOffsetWhenStartpointResolutionReturnsNull(): Unit = {
-    val offsetManagerMock = mock[OffsetManager]
-    val cacheMock = mock[StreamMetadataCache]
-    val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
-    val startingOffset = "72"
-    val mockStartpoint: Startpoint = mock[Startpoint]
-
-    when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
-    when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
-    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
-    doReturn(null).when(systemAdmin).resolveStartpointToOffset(systemStreamPartition, mockStartpoint)
-    doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
-    doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
-
-    val taskInstance = new TaskInstance(this.task,
-      this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
-      offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
-      systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
-      jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
-      applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
-
-    taskInstance.registerConsumers
-
-    verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
-    verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
-    verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
-  }
-
   private def setupTaskInstance(
     applicationTaskContextFactory: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]): Unit = {
     this.taskInstance = new TaskInstance(this.task,