You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/03/13 19:07:53 UTC
samza git commit: SAMZA-1100;
Exception when using a stream as both bootstrap and broadcast.
Repository: samza
Updated Branches:
refs/heads/master 5a88b9e47 -> ffd04d9d6
SAMZA-1100; Exception when using a stream as both bootstrap and broadcast.
When a task input stream is used as both broadcast and bootstrap stream in a samza job, Bootstrappingchooser marks the stream as bootstrapped when a single task finishes consuming all the SystemStreamPartitions(This happens when all the starting offset for each partition in the input stream is of type upcoming). This patch fixes this, by marking a stream as bootstrapped, only when all the systemStreamPartitions in a input stream is consumed by all the expected tasks.
More details here : https://issues.apache.org/jira/browse/SAMZA-1100
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Prateek Maheshwari<pr...@linkedin.com>
Closes #68 from shanthoosh/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ffd04d9d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ffd04d9d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ffd04d9d
Branch: refs/heads/master
Commit: ffd04d9d674437b44d438e3e372aa0cbbf6b7044
Parents: 5a88b9e
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Mon Mar 13 12:07:45 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Mon Mar 13 12:07:45 2017 -0700
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainer.scala | 2 +-
.../system/chooser/BootstrappingChooser.scala | 59 +++++++++++++++-----
.../samza/system/chooser/DefaultChooser.scala | 23 +++++---
.../chooser/TestBootstrappingChooser.scala | 37 ++++++++++--
.../system/chooser/TestDefaultChooser.scala | 6 +-
5 files changed, 98 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 89522dc..b00e10f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -354,7 +354,7 @@ object SamzaContainer extends Logging {
val chooserFactory = Util.getObj[MessageChooserFactory](chooserFactoryClassName)
- val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, samzaContainerMetrics.registry)
+ val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, samzaContainerMetrics.registry, systemAdmins)
info("Setting up metrics reporters.")
http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index fc99c84..f71bcfb 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -19,19 +19,21 @@
package org.apache.samza.system.chooser
-import java.util.concurrent.atomic.AtomicInteger
+import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.SamzaException
import org.apache.samza.util.Logging
import org.apache.samza.metrics.MetricsHelper
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemStreamMetadata
-import scala.collection.JavaConversions._
-import org.apache.samza.SamzaException
import org.apache.samza.system.SystemStreamMetadata.OffsetType
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
/**
* BootstrappingChooser is a composable MessageChooser that only chooses
* an envelope when it's received at least one envelope for each SystemStream.
@@ -69,7 +71,13 @@ class BootstrappingChooser(
/**
* An object that holds all of the metrics related to bootstrapping.
*/
- metrics: BootstrappingChooserMetrics = new BootstrappingChooserMetrics) extends MessageChooser with Logging {
+ metrics: BootstrappingChooserMetrics = new BootstrappingChooserMetrics,
+
+ /**
+ * A map from system stream name to SystemAdmin that is used for
+ * offset comparisons.
+ */
+ systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging {
/**
* The number of lagging partitions for each SystemStream that's behind.
@@ -91,9 +99,10 @@ class BootstrappingChooser(
.toSet
/**
- * Store all the systemStreamPartitions registered
+ * Mapping from the systemStreamPartition to the lowest registered offset.
+ * When multiple offsets are registered for a system stream partition, lowest offset is chosen.
*/
- var registeredSystemStreamPartitions = Set[SystemStreamPartition]()
+ var registeredSystemStreamPartitions = mutable.Map[SystemStreamPartition, String]()
/**
* The number of lagging partitions that the underlying wrapped chooser has
@@ -102,8 +111,16 @@ class BootstrappingChooser(
var updatedSystemStreams = Map[SystemStream, Int]()
def start = {
+ for ((systemStreamPartition, offset) <- registeredSystemStreamPartitions) {
+ // If the offset we're starting to consume from is the same as the upcoming
+ // offset for this system stream partition, then we've already read all
+ // messages in the stream, and we're at head for this system stream
+ // partition.
+ checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
+ }
+
// remove the systemStreamPartitions not registered.
- laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.contains(_))
+ laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.keys.contains(_))
systemStreamLagCounts = laggingSystemStreamPartitions.groupBy(_.getSystemStream).map {case (systemStream, ssps) => systemStream -> ssps.size}
debug("Starting bootstrapping chooser with bootstrap metadata: %s" format bootstrapStreamMetadata)
@@ -120,15 +137,27 @@ class BootstrappingChooser(
override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
debug("Registering stream partition with offset: %s, %s" format (systemStreamPartition, offset))
- // If the offset we're starting to consume from is the same as the upcoming
- // offset for this system stream partition, then we've already read all
- // messages in the stream, and we're at head for this system stream
- // partition.
- checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
-
wrapped.register(systemStreamPartition, offset)
- registeredSystemStreamPartitions += systemStreamPartition
+ val system = systemStreamPartition.getSystem
+ val systemAdmin = systemAdmins.getOrElse(system, throw new SamzaException("SystemAdmin is undefined for System: %s" format system))
+ /**
+ * SAMZA-1100: When a input SystemStream is consumed as both bootstrap and broadcast
+ * BootstrappingChooser should record the lowest offset for each registered SystemStreamPartition.
+ * When multiple tasks processing a SystemStreamPartition runs within a container
+ * and share the same chooser, then the lowest offset should be chosen as starting offset.
+ */
+ if (!registeredSystemStreamPartitions.contains(systemStreamPartition)) {
+ registeredSystemStreamPartitions += systemStreamPartition -> offset
+ } else if (offset != null) {
+ val existingOffset = registeredSystemStreamPartitions(systemStreamPartition)
+ val comparatorResult: Integer = systemAdmin.offsetComparator(existingOffset, offset)
+ if (comparatorResult == null) {
+ warn("Existing offset: %s and incoming offset: %s of system stream partition: %s are not comparable." format (existingOffset, offset, systemStreamPartition))
+ } else if (comparatorResult > 0) {
+ registeredSystemStreamPartitions += systemStreamPartition -> offset
+ }
+ }
}
def update(envelope: IncomingMessageEnvelope) {
http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
index b433713..c0805c4 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
@@ -22,14 +22,14 @@ package org.apache.samza.system.chooser
import org.apache.samza.SamzaException
import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava}
import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
-import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemAdmin, SystemStream, SystemStreamMetadata, SystemStreamPartition}
import org.apache.samza.util.Logging
import scala.collection.JavaConverters._
object DefaultChooser extends Logging {
- def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry) = {
+ def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry, systemAdmins: Map[String, SystemAdmin]) = {
val chooserConfig = new DefaultChooserConfig(config)
val batchSize = if (chooserConfig.getChooserBatchSize > 0) Some(chooserConfig.getChooserBatchSize) else None
@@ -65,8 +65,8 @@ object DefaultChooser extends Logging {
debug("Got bootstrap stream metadata: %s" format bootstrapStreamMetadata)
val priorities = if (usePriority) {
- // Ordering is important here. Overrides Int.MaxValue default for
- // bootstrap streams with explicitly configured values, in cases where
+ // Ordering is important here. Overrides Int.MaxValue default for
+ // bootstrap streams with explicitly configured values, in cases where
// users have defined a bootstrap stream's priority in config.
defaultPrioritizedStreams ++ prioritizedBootstrapStreams ++ prioritizedStreams
} else {
@@ -87,7 +87,8 @@ object DefaultChooser extends Logging {
priorities,
prioritizedChoosers,
bootstrapStreamMetadata,
- registry)
+ registry,
+ systemAdmins)
}
}
@@ -244,7 +245,13 @@ class DefaultChooser(
/**
* Metrics registry to be used when wiring up wrapped choosers.
*/
- registry: MetricsRegistry = new MetricsRegistryMap) extends MessageChooser with Logging {
+ registry: MetricsRegistry = new MetricsRegistryMap,
+
+ /**
+ * Defines a mapping from SystemStream name to SystemAdmin.
+ * This is useful for determining if a bootstrap SystemStream is caught up.
+ */
+ systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging {
val chooser = {
val useBatching = batchSize.isDefined
@@ -256,7 +263,7 @@ class DefaultChooser(
val maybePrioritized = if (usePriority) {
new TieredPriorityChooser(prioritizedStreams, prioritizedChoosers, DefaultChooser)
} else if (DefaultChooser == null) {
- // Null wrapped chooser without a priority chooser is not allowed
+ // Null wrapped chooser without a priority chooser is not allowed
// because DefaultChooser needs an underlying message chooser.
throw new SamzaException("A null chooser was given to the DefaultChooser. This is not allowed unless you are using prioritized/bootstrap streams, which you're not.")
} else {
@@ -270,7 +277,7 @@ class DefaultChooser(
}
if (useBootstrapping) {
- new BootstrappingChooser(maybeBatching, bootstrapStreamMetadata, new BootstrappingChooserMetrics(registry))
+ new BootstrappingChooser(maybeBatching, bootstrapStreamMetadata, new BootstrappingChooserMetrics(registry), systemAdmins)
} else {
maybeBatching
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index 2e0180d..2a095ce 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -24,6 +24,8 @@ import java.util.Arrays
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
+import org.apache.samza.container.MockSystemAdmin
+import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -34,7 +36,6 @@ import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters
import scala.collection.JavaConversions._
-import scala.collection.immutable.Queue
@RunWith(value = classOf[Parameterized])
class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
@@ -186,7 +187,7 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
val mock = new MockMessageChooser
val metadata1 = getMetadata(envelope1, "123")
val metadata2 = getMetadata(envelope2, "321")
- val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2))
+ val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))
chooser.register(envelope1.getSystemStreamPartition, "1")
chooser.register(envelope2.getSystemStreamPartition, "1")
@@ -198,6 +199,34 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1)
assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts)
}
+
+ @Test
+ def testChooserRegisterWithStreamUsedAsBootstrapAndBroadcast: Unit = {
+ val mock = new MockMessageChooser
+ val metadata1 = getMetadata(envelope1, "123")
+ val metadata2 = getMetadata(envelope2, "321")
+ val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))
+
+ // Envelope1 is registered by multiple tasks, each one of them having different offsets.
+ chooser.register(envelope1.getSystemStreamPartition, "1")
+ chooser.register(envelope1.getSystemStreamPartition, "2")
+ chooser.register(envelope1.getSystemStreamPartition, null)
+
+ // Envelope2 is registered by multiple tasks, each one of them having different offsets.
+ chooser.register(envelope2.getSystemStreamPartition, "1")
+ chooser.register(envelope2.getSystemStreamPartition, "2")
+ chooser.register(envelope2.getSystemStreamPartition, null)
+
+
+ chooser.start
+
+ // it should only contain stream partition 0 and stream1 partition 1
+ val expectedLaggingSsps = Set(envelope1.getSystemStreamPartition, envelope2.getSystemStreamPartition)
+ assertEquals(expectedLaggingSsps, chooser.laggingSystemStreamPartitions)
+ val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1)
+ assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts)
+
+ }
}
object TestBootstrappingChooser {
@@ -206,6 +235,6 @@ object TestBootstrappingChooser {
// chooser.
@Parameters
def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = Arrays.asList(
- Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata)),
- Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata)))
+ Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata, new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))),
+ Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata, registry = new MetricsRegistryMap(), systemAdmins = Map("kafka" -> new MockSystemAdmin))))
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index 7fb70b2..5a04469 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -21,6 +21,8 @@ package org.apache.samza.system.chooser
import org.apache.samza.Partition
import org.apache.samza.config.{DefaultChooserConfig, MapConfig}
+import org.apache.samza.container.MockSystemAdmin
+import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition}
import org.apache.samza.util.BlockingEnvelopeMap
@@ -66,7 +68,9 @@ class TestDefaultChooser {
1 -> mock2),
Map(
envelope1.getSystemStreamPartition.getSystemStream -> streamMetadata,
- envelope8.getSystemStreamPartition.getSystemStream -> stream3Metadata))
+ envelope8.getSystemStreamPartition.getSystemStream -> stream3Metadata),
+ new MetricsRegistryMap(),
+ Map("kafka" -> new MockSystemAdmin()))
chooser.register(envelope1.getSystemStreamPartition, null)
chooser.register(envelope2.getSystemStreamPartition, null)