You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/30 17:30:43 UTC
[samza] branch master updated: SAMZA-2220: Startpoints - Fully
encapsulate resolution of starting offsets in OffsetManager (#1053)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new fee823b SAMZA-2220: Startpoints - Fully encapsulate resolution of starting offsets in OffsetManager (#1053)
fee823b is described below
commit fee823bdae90d60094806a0b48668b8abd21bb5f
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Thu May 30 10:30:38 2019 -0700
SAMZA-2220: Startpoints - Fully encapsulate resolution of starting offsets in OffsetManager (#1053)
* SAMZA-2220: Startpoints - Fully encapsulate resolution of starting offsets in OffsetManager
* Address comment from @rmatharu
* Mention startpoints in OffsetManager javadoc.
---
.../apache/samza/checkpoint/OffsetManager.scala | 52 ++++++++--
.../org/apache/samza/container/TaskInstance.scala | 21 +---
.../samza/checkpoint/TestOffsetManager.scala | 78 +++++++++++++-
.../apache/samza/container/TestTaskInstance.scala | 114 ---------------------
4 files changed, 124 insertions(+), 141 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 90f9668..cdeddb0 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -22,6 +22,7 @@ package org.apache.samza.checkpoint
import java.util.HashMap
import java.util.concurrent.ConcurrentHashMap
+import org.apache.commons.lang3.StringUtils
import org.apache.samza.SamzaException
import org.apache.samza.annotation.InterfaceStability
import org.apache.samza.config.StreamConfig.Config2Stream
@@ -110,14 +111,15 @@ object OffsetManager extends Logging {
* OffsetManager does several things:
*
* <ul>
- * <li>Loads last checkpointed offset for all input SystemStreamPartitions in a
- * SamzaContainer.</li>
- * <li>Uses last checkpointed offset to figure out the next offset to start
- * reading from for each input SystemStreamPartition in a SamzaContainer</li>
+ * <li>Loads last checkpointed offset and startpoints for all input SystemStreamPartitions in a
+ * SamzaContainer. See SEP-18 for details.</li>
+ * <li>Uses last checkpointed offset or startpoint to figure out the next offset to start
+ * reading from for each input SystemStreamPartition in a SamzaContainer. Startpoints have a higher precedence than
+ * checkpoints.</li>
* <li>Keep track of the last processed offset for each SystemStreamPartitions
* in a SamzaContainer.</li>
* <li>Checkpoints the last processed offset for each SystemStreamPartitions
- * in a SamzaContainer periodically to the CheckpointManager.</li>
+ * in a SamzaContainer periodically to the CheckpointManager and deletes any associated startpoints.</li>
* </ul>
*
* All partitions must be registered before start is called, and start must be
@@ -137,7 +139,7 @@ class OffsetManager(
val checkpointManager: CheckpointManager = null,
/**
- * Optional startpoint manager for overrided offsets.
+ * Optional startpoint manager for overridden offsets.
*/
val startpointManager: StartpointManager = null,
@@ -535,9 +537,47 @@ class OffsetManager(
startpoints
.foreach(taskMap => taskMap._2
.foreach(sspMap => info("Loaded startpoint: %s for SSP: %s and task: %s" format (sspMap._2, sspMap._1, taskMap._1))))
+ resolveStartpointsToStartingOffsets
}
}
}
+
+ /**
+ * Overwrite starting offsets with resolved offsets from startpoints
+ */
+ private def resolveStartpointsToStartingOffsets: Unit = {
+ startpoints.foreach {
+ case (taskName, sspToStartpoint) => {
+ var resolvedOffsets: Map[SystemStreamPartition, String] = Map()
+ sspToStartpoint.foreach {
+ case (ssp, startpoint) => {
+ try {
+ val systemAdmin: SystemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem)
+ val resolvedOffset: String = systemAdmin.resolveStartpointToOffset(ssp, startpoint)
+ if (StringUtils.isNotBlank(resolvedOffset)) {
+ resolvedOffsets += ssp -> resolvedOffset
+ info("Resolved the startpoint: %s of system stream partition: %s to offset: %s." format(startpoint, ssp, resolvedOffset))
+ }
+ } catch {
+ case e: Exception => {
+ error("Exception occurred when resolving startpoint: %s of system stream partition: %s to offset." format(startpoint, ssp), e)
+ }
+ }
+ }
+ }
+
+ // copy starting offsets and overwrite with resolved offsets
+ var mergedOffsets: Map[SystemStreamPartition, String] = startingOffsets.getOrElse(taskName, Map())
+ resolvedOffsets.foreach {
+ case (ssp, resolvedOffset) => {
+ mergedOffsets += ssp -> resolvedOffset
+ }
+ }
+ startingOffsets += taskName -> mergedOffsets
+ }
+ }
+ }
+
/**
* Use defaultOffsets to get a next offset for every SystemStreamPartition
* that was registered, but has no offset.
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 884244e..22879fa 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -153,26 +153,7 @@ class TaskInstance(
def registerConsumers() {
debug("Registering consumers for taskName: %s" format taskName)
systemStreamPartitions.foreach(systemStreamPartition => {
- var startingOffset: String = getStartingOffset(systemStreamPartition)
- val startpointOption: Option[Startpoint] = offsetManager.getStartpoint(taskName, systemStreamPartition)
- startpointOption match {
- case Some(startpoint) => {
- try {
- val systemAdmin: SystemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
- val resolvedOffset: String = systemAdmin.resolveStartpointToOffset(systemStreamPartition, startpoint)
- if (StringUtils.isNotBlank(resolvedOffset)) {
- startingOffset = resolvedOffset
- info("Resolved the startpoint: %s of system stream partition: %s to offset: %s." format(startpoint, systemStreamPartition, startingOffset))
- }
- } catch {
- case e: Exception =>
- error("Exception occurred when resolving startpoint: %s of system stream partition: %s to offset." format(startpoint, systemStreamPartition), e)
- }
- }
- case None => {
- debug("Startpoint does not exist for system stream partition: %s. Using the checkpointed offset: %s" format(systemStreamPartition, startingOffset))
- }
- }
+ val startingOffset: String = getStartingOffset(systemStreamPartition)
consumerMultiplexer.register(systemStreamPartition, startingOffset)
metrics.addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).orNull)
})
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index 4c53ec2..50c793c 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -22,9 +22,10 @@ package org.apache.samza.checkpoint
import java.util
import java.util.function.BiConsumer
+import com.google.common.collect.ImmutableMap
import org.apache.samza.config.MapConfig
import org.apache.samza.container.TaskName
-import org.apache.samza.startpoint.{StartpointManagerTestUtil, StartpointOldest, StartpointUpcoming}
+import org.apache.samza.startpoint.{Startpoint, StartpointManagerTestUtil, StartpointOldest, StartpointSpecific, StartpointUpcoming}
import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata}
import org.apache.samza.system._
import org.apache.samza.{Partition, SamzaException}
@@ -135,6 +136,81 @@ class TestOffsetManager {
}
@Test
+ def testGetStartingOffsetWhenResolvedFromStartpoint: Unit = {
+ val taskName1 = new TaskName("c")
+ val taskName2 = new TaskName("d")
+ val systemStream1 = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream1, partition)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream1.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "51", "52")).asJava)
+ val systemStreamMetadata = Map(systemStream1 -> testStreamMetadata)
+ val config = new MapConfig
+ val checkpointManager = getCheckpointManager(systemStreamPartition, taskName1)
+ val startpointManagerUtil = getStartpointManagerUtil()
+ val systemAdmins = mock(classOf[SystemAdmins])
+ val systemAdmin = mock(classOf[SystemAdmin])
+ when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+ val testStartpoint = new StartpointSpecific("23")
+ Mockito.doReturn(testStartpoint.getSpecificOffset).when(systemAdmin).resolveStartpointToOffset(refEq(systemStreamPartition), refEq(testStartpoint))
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+
+ offsetManager.register(taskName1, Set(systemStreamPartition))
+ val startpointManager = startpointManagerUtil.getStartpointManager
+ startpointManager.writeStartpoint(systemStreamPartition, testStartpoint)
+ startpointManager.fanOut(asTaskToSSPMap(taskName1, systemStreamPartition))
+ offsetManager.start
+ assertEquals(testStartpoint.getSpecificOffset, offsetManager.getStartingOffset(taskName1, systemStreamPartition).get)
+ }
+
+ @Test
+ def testGetStartingOffsetWhenResolveStartpointToOffsetIsNull: Unit = {
+ val taskName1 = new TaskName("c")
+ val taskName2 = new TaskName("d")
+ val systemStream1 = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream1, partition)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream1.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "51", "52")).asJava)
+ val systemStreamMetadata = Map(systemStream1 -> testStreamMetadata)
+ val config = new MapConfig
+ val checkpointManager = getCheckpointManager(systemStreamPartition, taskName1)
+ val startpointManagerUtil = getStartpointManagerUtil()
+ val systemAdmins = mock(classOf[SystemAdmins])
+ val systemAdmin = mock(classOf[SystemAdmin])
+ when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+ Mockito.doReturn(null).when(systemAdmin).resolveStartpointToOffset(refEq(systemStreamPartition), refEq(null))
+ Mockito.doReturn(ImmutableMap.of(systemStreamPartition, "46")).when(systemAdmin).getOffsetsAfter(any[util.Map[SystemStreamPartition, String]])
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+ offsetManager.register(taskName1, Set(systemStreamPartition))
+ val startpointManager = startpointManagerUtil.getStartpointManager
+ offsetManager.start
+ assertEquals("46", offsetManager.getStartingOffset(taskName1, systemStreamPartition).get)
+ }
+
+ @Test
+ def testGetStartingOffsetWhenResolveStartpointToOffsetThrows: Unit = {
+ val taskName1 = new TaskName("c")
+ val taskName2 = new TaskName("d")
+ val systemStream1 = new SystemStream("test-system", "test-stream")
+ val partition = new Partition(0)
+ val systemStreamPartition = new SystemStreamPartition(systemStream1, partition)
+ val testStreamMetadata = new SystemStreamMetadata(systemStream1.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "51", "52")).asJava)
+ val systemStreamMetadata = Map(systemStream1 -> testStreamMetadata)
+ val config = new MapConfig
+ val checkpointManager = getCheckpointManager(systemStreamPartition, taskName1)
+ val startpointManagerUtil = getStartpointManagerUtil()
+ val systemAdmins = mock(classOf[SystemAdmins])
+ val systemAdmin = mock(classOf[SystemAdmin])
+ when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+ Mockito.doThrow(new RuntimeException("mock startpoint resolution exception")).when(systemAdmin).resolveStartpointToOffset(refEq(systemStreamPartition), refEq(null))
+ Mockito.doReturn(ImmutableMap.of(systemStreamPartition, "46")).when(systemAdmin).getOffsetsAfter(any[util.Map[SystemStreamPartition, String]])
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+ offsetManager.register(taskName1, Set(systemStreamPartition))
+ val startpointManager = startpointManagerUtil.getStartpointManager
+ offsetManager.start
+ assertEquals("46", offsetManager.getStartingOffset(taskName1, systemStreamPartition).get)
+ }
+
+ @Test
def testGetCheckpointedOffsetMetric{
val taskName = new TaskName("c")
val systemStream = new SystemStream("test-system", "test-stream")
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 06a091c..921e91b 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -295,120 +295,6 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
assertTrue(taskInstance.ssp2CaughtupMapping(ssp))
}
- @Test
- def testRegisterConsumersShouldUseTheCheckpointedOffsetWhenStartpointDoesNotExist(): Unit = {
- val offsetManagerMock = mock[OffsetManager]
- val cacheMock = mock[StreamMetadataCache]
- val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
- val startingOffset = "42"
-
- when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
- when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(None)
- doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
- doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
-
- val taskInstance = new TaskInstance(this.task,
- this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
- offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
- systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
- jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
- applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
-
- taskInstance.registerConsumers
-
- verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
- verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
- verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
- }
-
- @Test
- def testRegisterConsumersShouldUseStartpointtWhenItExists(): Unit = {
- val offsetManagerMock = mock[OffsetManager]
- val cacheMock = mock[StreamMetadataCache]
- val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
- val startingOffset = "52"
- val resolvedOffset = "62"
- val mockStartpoint: Startpoint = mock[Startpoint]
-
- when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
- when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
- when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
- when(systemAdmin.resolveStartpointToOffset(systemStreamPartition, mockStartpoint)).thenReturn(resolvedOffset)
- doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, resolvedOffset)
- doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
-
- val taskInstance = new TaskInstance(this.task,
- this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
- offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
- systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
- jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
- applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
-
- taskInstance.registerConsumers
-
- verify(this.consumerMultiplexer).register(systemStreamPartition, resolvedOffset)
- verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
- verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
- }
-
- @Test
- def testRegisterConsumersShouldUseCheckpointedOffsetWhenStartpointResolutionThrowsUp(): Unit = {
- val offsetManagerMock = mock[OffsetManager]
- val cacheMock = mock[StreamMetadataCache]
- val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
- val startingOffset = "72"
- val mockStartpoint: Startpoint = mock[Startpoint]
-
- when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
- when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
- when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
- doThrow(new RuntimeException("Random exception")).when(systemAdmin).resolveStartpointToOffset(systemStreamPartition, mockStartpoint)
- doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
- doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
-
- val taskInstance = new TaskInstance(this.task,
- this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
- offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
- systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
- jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
- applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
-
- taskInstance.registerConsumers
-
- verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
- verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
- verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
- }
-
- @Test
- def testRegisterConsumersShouldUseCheckpointedOffsetWhenStartpointResolutionReturnsNull(): Unit = {
- val offsetManagerMock = mock[OffsetManager]
- val cacheMock = mock[StreamMetadataCache]
- val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
- val startingOffset = "72"
- val mockStartpoint: Startpoint = mock[Startpoint]
-
- when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
- when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
- when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
- doReturn(null).when(systemAdmin).resolveStartpointToOffset(systemStreamPartition, mockStartpoint)
- doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
- doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
-
- val taskInstance = new TaskInstance(this.task,
- this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
- offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
- systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
- jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
- applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
-
- taskInstance.registerConsumers
-
- verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
- verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
- verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
- }
-
private def setupTaskInstance(
applicationTaskContextFactory: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]): Unit = {
this.taskInstance = new TaskInstance(this.task,