You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/11 03:17:51 UTC
[samza] branch master updated: SAMZA-2189: Integrate startpoint
resolution workflow with SamzaContainer startup sequence. (#1026)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new a73414a SAMZA-2189: Integrate startpoint resolution workflow with SamzaContainer startup sequence. (#1026)
a73414a is described below
commit a73414a03ffc9eb713f53500a5520309e562cb36
Author: shanthoosh <sp...@usc.edu>
AuthorDate: Fri May 10 20:17:46 2019 -0700
SAMZA-2189: Integrate startpoint resolution workflow with SamzaContainer startup sequence. (#1026)
* Resolve startpoint to offset before registering with SystemConsumer.
* Add unit tests.
* Address review comments.
---
.../org/apache/samza/container/TaskInstance.scala | 41 ++++++--
.../samza/storage/ContainerStorageManager.java | 2 +-
.../org/apache/samza/system/SystemConsumers.scala | 9 +-
.../apache/samza/container/TestTaskInstance.scala | 115 ++++++++++++++++++++-
.../apache/samza/system/TestSystemConsumers.scala | 21 ++--
5 files changed, 159 insertions(+), 29 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index fa17f24..ae6db22 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -23,14 +23,15 @@ package org.apache.samza.container
import java.util.{Objects, Optional}
import java.util.concurrent.ScheduledExecutorService
+import org.apache.commons.lang3.StringUtils
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.OffsetManager
import org.apache.samza.config.Config
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.context._
import org.apache.samza.job.model.{JobModel, TaskModel}
-import org.apache.samza.metrics.MetricsReporter
import org.apache.samza.scheduler.{CallbackSchedulerImpl, ScheduledCallback}
+import org.apache.samza.startpoint.Startpoint
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.storage.TaskStorageManager
import org.apache.samza.system._
@@ -141,15 +142,39 @@ class TaskInstance(
collector.register
}
- def registerConsumers {
+ /**
+ * Computes the starting offset for the partitions assigned to the task and registers them with the underlying {@see SystemConsumers}.
+ *
+ * Starting offset for a partition of the task is computed in the following manner:
+ *
+ * 1. If a startpoint exists for a task, system stream partition and it resolves to a offset, then the resolved offset is used as the starting offset.
+ * 2. Else, the checkpointed offset for the system stream partition is used as the starting offset.
+ */
+ def registerConsumers() {
debug("Registering consumers for taskName: %s" format taskName)
systemStreamPartitions.foreach(systemStreamPartition => {
- val startingOffset = getStartingOffset(systemStreamPartition)
- val startpoint = offsetManager.getStartpoint(taskName, systemStreamPartition).getOrElse(null)
- consumerMultiplexer.register(systemStreamPartition, startingOffset, startpoint)
- metrics.addOffsetGauge(systemStreamPartition, () =>
- offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).orNull
- )
+ var startingOffset: String = getStartingOffset(systemStreamPartition)
+ val startpointOption: Option[Startpoint] = offsetManager.getStartpoint(taskName, systemStreamPartition)
+ startpointOption match {
+ case Some(startpoint) => {
+ try {
+ val systemAdmin: SystemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
+ val resolvedOffset: String = systemAdmin.resolveStartpointToOffset(systemStreamPartition, startpoint)
+ if (StringUtils.isNotBlank(resolvedOffset)) {
+ startingOffset = resolvedOffset
+ info("Resolved the startpoint: %s of system stream partition: %s to offset: %s." format(startpoint, systemStreamPartition, startingOffset))
+ }
+ } catch {
+ case e: Exception =>
+ error("Exception occurred when resolving startpoint: %s of system stream partition: %s to offset." format(startpoint, systemStreamPartition), e)
+ }
+ }
+ case None => {
+ debug("Startpoint does not exist for system stream partition: %s. Using the checkpointed offset: %s" format(systemStreamPartition, startingOffset))
+ }
+ }
+ consumerMultiplexer.register(systemStreamPartition, startingOffset)
+ metrics.addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).orNull)
})
}
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 0d0e0d8..85e10ea 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -667,7 +667,7 @@ public class ContainerStorageManager {
}
// register startingOffset with the sysConsumer and register a metric for it
- sideInputSystemConsumers.register(ssp, startingOffset, null);
+ sideInputSystemConsumers.register(ssp, startingOffset);
taskInstanceMetrics.get(sideInputStorageManagers.get(ssp).getTaskName()).addOffsetGauge(
ssp, ScalaJavaUtil.toScalaFunction(() -> sideInputStorageManagers.get(ssp).getLastProcessedOffset(ssp)));
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index 05f423d..563a104 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -31,7 +31,6 @@ import java.util.Set
import scala.collection.JavaConverters._
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.util.{Logging, TimerUtil}
-import org.apache.samza.startpoint.Startpoint
import org.apache.samza.system.chooser.MessageChooser
import org.apache.samza.SamzaException
@@ -197,7 +196,7 @@ class SystemConsumers (
}
- def register(systemStreamPartition: SystemStreamPartition, offset: String, startpoint: Startpoint) {
+ def register(systemStreamPartition: SystemStreamPartition, offset: String) {
debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(offset)) {
@@ -209,12 +208,6 @@ class SystemConsumers (
metrics.registerSystemStreamPartition(systemStreamPartition)
unprocessedMessagesBySSP.put(systemStreamPartition, new ArrayDeque[IncomingMessageEnvelope]())
- // Note regarding Startpoints and MessageChooser:
- // Even if there is a startpoint for this SSP, passing in the checkpoint offset should not have any side-effects.
- // Basically, the offset in the chooser is used in the special scenario where an SSP is both a broadcast and bootstrap stream
- // and needs to decide what's the lowest starting offset for an SSP that spans across multiple tasks so it knows
- // to keep the highest priority on the SSP starting from the lowest starting offset until the SSP is fully
- // bootstrapped to the UPCOMING offset. The offset here is ignored otherwise.
chooser.register(systemStreamPartition, offset)
try {
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 0386386..06a091c 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -19,7 +19,6 @@
package org.apache.samza.container
-
import java.util.Collections
import org.apache.samza.Partition
@@ -27,6 +26,7 @@ import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
import org.apache.samza.context.{TaskContext => _, _}
import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.Counter
+import org.apache.samza.startpoint.Startpoint
import org.apache.samza.storage.TaskStorageManager
import org.apache.samza.system.{IncomingMessageEnvelope, StreamMetadataCache, SystemAdmin, SystemConsumers, SystemStream, SystemStreamMetadata, _}
import org.apache.samza.table.TableManager
@@ -295,6 +295,119 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
assertTrue(taskInstance.ssp2CaughtupMapping(ssp))
}
+ @Test
+ def testRegisterConsumersShouldUseTheCheckpointedOffsetWhenStartpointDoesNotExist(): Unit = {
+ val offsetManagerMock = mock[OffsetManager]
+ val cacheMock = mock[StreamMetadataCache]
+ val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+ val startingOffset = "42"
+
+ when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
+ when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(None)
+ doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+ doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
+
+ val taskInstance = new TaskInstance(this.task,
+ this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
+ offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
+ systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
+ jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
+ applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
+
+ taskInstance.registerConsumers
+
+ verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+ verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
+ verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
+ }
+
+ @Test
+ def testRegisterConsumersShouldUseStartpointtWhenItExists(): Unit = {
+ val offsetManagerMock = mock[OffsetManager]
+ val cacheMock = mock[StreamMetadataCache]
+ val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+ val startingOffset = "52"
+ val resolvedOffset = "62"
+ val mockStartpoint: Startpoint = mock[Startpoint]
+
+ when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
+ when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
+ when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+ when(systemAdmin.resolveStartpointToOffset(systemStreamPartition, mockStartpoint)).thenReturn(resolvedOffset)
+ doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, resolvedOffset)
+ doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
+
+ val taskInstance = new TaskInstance(this.task,
+ this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
+ offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
+ systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
+ jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
+ applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
+
+ taskInstance.registerConsumers
+
+ verify(this.consumerMultiplexer).register(systemStreamPartition, resolvedOffset)
+ verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
+ verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
+ }
+
+ @Test
+ def testRegisterConsumersShouldUseCheckpointedOffsetWhenStartpointResolutionThrowsUp(): Unit = {
+ val offsetManagerMock = mock[OffsetManager]
+ val cacheMock = mock[StreamMetadataCache]
+ val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+ val startingOffset = "72"
+ val mockStartpoint: Startpoint = mock[Startpoint]
+
+ when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
+ when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
+ when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+ doThrow(new RuntimeException("Random exception")).when(systemAdmin).resolveStartpointToOffset(systemStreamPartition, mockStartpoint)
+ doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+ doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
+
+ val taskInstance = new TaskInstance(this.task,
+ this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
+ offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
+ systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
+ jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
+ applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
+
+ taskInstance.registerConsumers
+
+ verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+ verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
+ verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
+ }
+
+ @Test
+ def testRegisterConsumersShouldUseCheckpointedOffsetWhenStartpointResolutionReturnsNull(): Unit = {
+ val offsetManagerMock = mock[OffsetManager]
+ val cacheMock = mock[StreamMetadataCache]
+ val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+ val startingOffset = "72"
+ val mockStartpoint: Startpoint = mock[Startpoint]
+
+ when(offsetManagerMock.getStartingOffset(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(startingOffset))
+ when(offsetManagerMock.getStartpoint(TASK_NAME, systemStreamPartition)).thenReturn(Option.apply(mockStartpoint))
+ when(systemAdmins.getSystemAdmin("test-system")).thenReturn(systemAdmin)
+ doReturn(null).when(systemAdmin).resolveStartpointToOffset(systemStreamPartition, mockStartpoint)
+ doNothing().when(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+ doNothing().when(this.metrics).addOffsetGauge(systemStreamPartition, () => offsetManager.getLastProcessedOffset(TASK_NAME, systemStreamPartition).orNull)
+
+ val taskInstance = new TaskInstance(this.task,
+ this.taskModel, this.metrics, this.systemAdmins, this.consumerMultiplexer, this.collector,
+ offsetManager = offsetManagerMock, storageManager = this.taskStorageManager, tableManager = this.taskTableManager,
+ systemStreamPartitions = Set(systemStreamPartition), exceptionHandler = this.taskInstanceExceptionHandler, streamMetadataCache = cacheMock,
+ jobContext = this.jobContext, containerContext = this.containerContext, applicationContainerContextOption = Some(this.applicationContainerContext),
+ applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory), externalContextOption = Some(this.externalContext))
+
+ taskInstance.registerConsumers
+
+ verify(this.consumerMultiplexer).register(systemStreamPartition, startingOffset)
+ verify(offsetManagerMock).getStartpoint(TASK_NAME, systemStreamPartition)
+ verify(offsetManagerMock).getStartingOffset(TASK_NAME, systemStreamPartition)
+ }
private def setupTaskInstance(
applicationTaskContextFactory: Option[ApplicationTaskContextFactory[ApplicationTaskContext]]): Unit = {
diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
index 875c4a9..037fbb3 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
@@ -53,8 +53,8 @@ class TestSystemConsumers {
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
- consumers.register(systemStreamPartition0, "0", null)
- consumers.register(systemStreamPartition1, "1234", null)
+ consumers.register(systemStreamPartition0, "0")
+ consumers.register(systemStreamPartition1, "1234")
consumers.start
// Tell the consumer to respond with 1000 messages for SSP0, and no
@@ -118,7 +118,7 @@ class TestSystemConsumers {
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => now)
- consumers.register(systemStreamPartition, "0", null)
+ consumers.register(systemStreamPartition, "0")
consumers.start
// Start should trigger a poll to the consumer.
@@ -184,7 +184,7 @@ class TestSystemConsumers {
def register(systemStreamPartition: SystemStreamPartition, offset: String) = chooserRegistered += systemStreamPartition -> offset
}, consumer, systemAdmins)
- consumers.register(systemStreamPartition, "0", null)
+ consumers.register(systemStreamPartition, "0")
consumers.start
consumers.stop
@@ -226,7 +226,7 @@ class TestSystemConsumers {
// it should throw a SystemConsumersException because system2 does not have a consumer
var caughtRightException = false
try {
- consumers.register(systemStreamPartition2, "0", null)
+ consumers.register(systemStreamPartition2, "0")
} catch {
case e: SystemConsumersException => caughtRightException = true
case _: Throwable => caughtRightException = false
@@ -247,7 +247,7 @@ class TestSystemConsumers {
// throw exceptions when the deserialization has error
val consumers = new SystemConsumers(msgChooser, consumer, systemAdmins, serdeManager, dropDeserializationError = false)
- consumers.register(systemStreamPartition, "0", null)
+ consumers.register(systemStreamPartition, "0")
consumers.start
consumer(system).putStringMessage
consumer(system).putBytesMessage
@@ -264,7 +264,7 @@ class TestSystemConsumers {
// it should not throw exceptions when deserializaion fails if dropDeserializationError is set to true
val consumers2 = new SystemConsumers(msgChooser, consumer, systemAdmins, serdeManager, dropDeserializationError = true)
- consumers2.register(systemStreamPartition, "0", null)
+ consumers2.register(systemStreamPartition, "0")
consumers2.start
consumer(system).putBytesMessage
consumer(system).putStringMessage
@@ -318,8 +318,8 @@ class TestSystemConsumers {
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
- consumers.register(systemStreamPartition1, "0", null)
- consumers.register(systemStreamPartition2, "0", null)
+ consumers.register(systemStreamPartition1, "0")
+ consumers.register(systemStreamPartition2, "0")
consumers.start
// Start should trigger a poll to the consumer.
@@ -362,7 +362,6 @@ class TestSystemConsumers {
val systemStreamPartition2 = new SystemStreamPartition(system, stream, new Partition(2))
val consumer = Mockito.mock(classOf[SystemConsumer])
- val startpoint = Mockito.mock(classOf[Startpoint])
val systemAdmins = Mockito.mock(classOf[SystemAdmins])
Mockito.when(systemAdmins.getSystemAdmin(system)).thenReturn(Mockito.mock(classOf[SystemAdmin]))
@@ -372,7 +371,7 @@ class TestSystemConsumers {
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
- consumers.register(systemStreamPartition1, "0", startpoint)
+ consumers.register(systemStreamPartition1, "0")
}
/**