You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/03/13 19:07:53 UTC

samza git commit: SAMZA-1100; Exception when using a stream as both bootstrap and broadcast.

Repository: samza
Updated Branches:
  refs/heads/master 5a88b9e47 -> ffd04d9d6


SAMZA-1100; Exception when using a stream as both bootstrap and broadcast.

When a task input stream is used as both broadcast and bootstrap stream in a samza job, Bootstrappingchooser marks the stream as bootstrapped when a single task finishes consuming all the SystemStreamPartitions(This happens when all the starting offset for each partition in the input stream is of type upcoming). This patch fixes this, by marking a stream as bootstrapped, only when all the systemStreamPartitions in a input stream is consumed by all the expected tasks.

More details here : https://issues.apache.org/jira/browse/SAMZA-1100

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Prateek Maheshwari<pr...@linkedin.com>

Closes #68 from shanthoosh/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ffd04d9d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ffd04d9d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ffd04d9d

Branch: refs/heads/master
Commit: ffd04d9d674437b44d438e3e372aa0cbbf6b7044
Parents: 5a88b9e
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Mon Mar 13 12:07:45 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Mon Mar 13 12:07:45 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala |  2 +-
 .../system/chooser/BootstrappingChooser.scala   | 59 +++++++++++++++-----
 .../samza/system/chooser/DefaultChooser.scala   | 23 +++++---
 .../chooser/TestBootstrappingChooser.scala      | 37 ++++++++++--
 .../system/chooser/TestDefaultChooser.scala     |  6 +-
 5 files changed, 98 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 89522dc..b00e10f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -354,7 +354,7 @@ object SamzaContainer extends Logging {
 
     val chooserFactory = Util.getObj[MessageChooserFactory](chooserFactoryClassName)
 
-    val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, samzaContainerMetrics.registry)
+    val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, samzaContainerMetrics.registry, systemAdmins)
 
     info("Setting up metrics reporters.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index fc99c84..f71bcfb 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -19,19 +19,21 @@
 
 package org.apache.samza.system.chooser
 
-import java.util.concurrent.atomic.AtomicInteger
+import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.SamzaException
 import org.apache.samza.util.Logging
 import org.apache.samza.metrics.MetricsHelper
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemStreamMetadata
-import scala.collection.JavaConversions._
-import org.apache.samza.SamzaException
 import org.apache.samza.system.SystemStreamMetadata.OffsetType
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
 /**
  * BootstrappingChooser is a composable MessageChooser that only chooses
  * an envelope when it's received at least one envelope for each SystemStream.
@@ -69,7 +71,13 @@ class BootstrappingChooser(
   /**
    * An object that holds all of the metrics related to bootstrapping.
    */
-  metrics: BootstrappingChooserMetrics = new BootstrappingChooserMetrics) extends MessageChooser with Logging {
+  metrics: BootstrappingChooserMetrics = new BootstrappingChooserMetrics,
+
+  /**
+   * A map from system stream name to SystemAdmin that is used for
+   * offset comparisons.
+   */
+  systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging {
 
   /**
    * The number of lagging partitions for each SystemStream that's behind.
@@ -91,9 +99,10 @@ class BootstrappingChooser(
     .toSet
 
   /**
-   * Store all the systemStreamPartitions registered
+   * Mapping from the systemStreamPartition to the lowest registered offset.
+   * When multiple offsets are registered for a system stream partition, lowest offset is chosen.
    */
-  var registeredSystemStreamPartitions = Set[SystemStreamPartition]()
+  var registeredSystemStreamPartitions = mutable.Map[SystemStreamPartition, String]()
 
   /**
    * The number of lagging partitions that the underlying wrapped chooser has
@@ -102,8 +111,16 @@ class BootstrappingChooser(
   var updatedSystemStreams = Map[SystemStream, Int]()
 
   def start = {
+    for ((systemStreamPartition, offset) <- registeredSystemStreamPartitions) {
+      // If the offset we're starting to consume from is the same as the upcoming
+      // offset for this system stream partition, then we've already read all
+      // messages in the stream, and we're at head for this system stream
+      // partition.
+      checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
+    }
+
     // remove the systemStreamPartitions not registered.
-    laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.contains(_))
+    laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.keys.contains(_))
     systemStreamLagCounts = laggingSystemStreamPartitions.groupBy(_.getSystemStream).map {case (systemStream, ssps) => systemStream -> ssps.size}
 
     debug("Starting bootstrapping chooser with bootstrap metadata: %s" format bootstrapStreamMetadata)
@@ -120,15 +137,27 @@ class BootstrappingChooser(
   override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
     debug("Registering stream partition with offset: %s, %s" format (systemStreamPartition, offset))
 
-    // If the offset we're starting to consume from is the same as the upcoming 
-    // offset for this system stream partition, then we've already read all
-    // messages in the stream, and we're at head for this system stream 
-    // partition.
-    checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
-
     wrapped.register(systemStreamPartition, offset)
 
-    registeredSystemStreamPartitions += systemStreamPartition
+    val system = systemStreamPartition.getSystem
+    val systemAdmin = systemAdmins.getOrElse(system, throw new SamzaException("SystemAdmin is undefined for System: %s" format system))
+    /**
+     * SAMZA-1100: When a input SystemStream is consumed as both bootstrap and broadcast
+     * BootstrappingChooser should record the lowest offset for each registered SystemStreamPartition.
+     * When multiple tasks processing a SystemStreamPartition runs within a container
+     * and share the same chooser, then the lowest offset should be chosen as starting offset.
+     */
+    if (!registeredSystemStreamPartitions.contains(systemStreamPartition)) {
+      registeredSystemStreamPartitions += systemStreamPartition -> offset
+    } else if (offset != null) {
+      val existingOffset = registeredSystemStreamPartitions(systemStreamPartition)
+      val comparatorResult: Integer = systemAdmin.offsetComparator(existingOffset, offset)
+      if (comparatorResult == null) {
+        warn("Existing offset: %s and incoming offset: %s of system stream partition: %s are not comparable." format (existingOffset, offset, systemStreamPartition))
+      } else if (comparatorResult > 0) {
+        registeredSystemStreamPartitions += systemStreamPartition -> offset
+      }
+    }
   }
 
   def update(envelope: IncomingMessageEnvelope) {

http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
index b433713..c0805c4 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
@@ -22,14 +22,14 @@ package org.apache.samza.system.chooser
 import org.apache.samza.SamzaException
 import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava}
 import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
-import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemAdmin, SystemStream, SystemStreamMetadata, SystemStreamPartition}
 import org.apache.samza.util.Logging
 
 import scala.collection.JavaConverters._
 
 
 object DefaultChooser extends Logging {
-  def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry) = {
+  def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry, systemAdmins: Map[String, SystemAdmin]) = {
     val chooserConfig = new DefaultChooserConfig(config)
     val batchSize = if (chooserConfig.getChooserBatchSize > 0) Some(chooserConfig.getChooserBatchSize) else None
 
@@ -65,8 +65,8 @@ object DefaultChooser extends Logging {
     debug("Got bootstrap stream metadata: %s" format bootstrapStreamMetadata)
 
     val priorities = if (usePriority) {
-      // Ordering is important here. Overrides Int.MaxValue default for 
-      // bootstrap streams with explicitly configured values, in cases where 
+      // Ordering is important here. Overrides Int.MaxValue default for
+      // bootstrap streams with explicitly configured values, in cases where
       // users have defined a bootstrap stream's priority in config.
       defaultPrioritizedStreams ++ prioritizedBootstrapStreams ++ prioritizedStreams
     } else {
@@ -87,7 +87,8 @@ object DefaultChooser extends Logging {
       priorities,
       prioritizedChoosers,
       bootstrapStreamMetadata,
-      registry)
+      registry,
+      systemAdmins)
   }
 }
 
@@ -244,7 +245,13 @@ class DefaultChooser(
   /**
    * Metrics registry to be used when wiring up wrapped choosers.
    */
-  registry: MetricsRegistry = new MetricsRegistryMap) extends MessageChooser with Logging {
+  registry: MetricsRegistry = new MetricsRegistryMap,
+
+  /**
+   * Defines a mapping from SystemStream name to SystemAdmin.
+   * This is useful for determining if a bootstrap SystemStream is caught up.
+   */
+  systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging {
 
   val chooser = {
     val useBatching = batchSize.isDefined
@@ -256,7 +263,7 @@ class DefaultChooser(
     val maybePrioritized = if (usePriority) {
       new TieredPriorityChooser(prioritizedStreams, prioritizedChoosers, DefaultChooser)
     } else if (DefaultChooser == null) {
-      // Null wrapped chooser without a priority chooser is not allowed 
+      // Null wrapped chooser without a priority chooser is not allowed
       // because DefaultChooser needs an underlying message chooser.
       throw new SamzaException("A null chooser was given to the DefaultChooser. This is not allowed unless you are using prioritized/bootstrap streams, which you're not.")
     } else {
@@ -270,7 +277,7 @@ class DefaultChooser(
     }
 
     if (useBootstrapping) {
-      new BootstrappingChooser(maybeBatching, bootstrapStreamMetadata, new BootstrappingChooserMetrics(registry))
+      new BootstrappingChooser(maybeBatching, bootstrapStreamMetadata, new BootstrappingChooserMetrics(registry), systemAdmins)
     } else {
       maybeBatching
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index 2e0180d..2a095ce 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -24,6 +24,8 @@ import java.util.Arrays
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
+import org.apache.samza.container.MockSystemAdmin
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -34,7 +36,6 @@ import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
 
 import scala.collection.JavaConversions._
-import scala.collection.immutable.Queue
 
 @RunWith(value = classOf[Parameterized])
 class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
@@ -186,7 +187,7 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     val mock = new MockMessageChooser
     val metadata1 = getMetadata(envelope1, "123")
     val metadata2 = getMetadata(envelope2, "321")
-    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2))
+    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))
 
     chooser.register(envelope1.getSystemStreamPartition, "1")
     chooser.register(envelope2.getSystemStreamPartition, "1")
@@ -198,6 +199,34 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1)
     assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts)
   }
+
+  @Test
+  def testChooserRegisterWithStreamUsedAsBootstrapAndBroadcast: Unit = {
+    val mock = new MockMessageChooser
+    val metadata1 = getMetadata(envelope1, "123")
+    val metadata2 = getMetadata(envelope2, "321")
+    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))
+
+    // Envelope1 is registered by multiple tasks, each one of them having different offsets.
+    chooser.register(envelope1.getSystemStreamPartition, "1")
+    chooser.register(envelope1.getSystemStreamPartition, "2")
+    chooser.register(envelope1.getSystemStreamPartition, null)
+
+    // Envelope2 is registered by multiple tasks, each one of them having different offsets.
+    chooser.register(envelope2.getSystemStreamPartition, "1")
+    chooser.register(envelope2.getSystemStreamPartition, "2")
+    chooser.register(envelope2.getSystemStreamPartition, null)
+
+
+    chooser.start
+
+    // it should only contain stream partition 0 and stream1 partition 1
+    val expectedLaggingSsps = Set(envelope1.getSystemStreamPartition, envelope2.getSystemStreamPartition)
+    assertEquals(expectedLaggingSsps, chooser.laggingSystemStreamPartitions)
+    val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1)
+    assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts)
+
+  }
 }
 
 object TestBootstrappingChooser {
@@ -206,6 +235,6 @@ object TestBootstrappingChooser {
   // chooser.
   @Parameters
   def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = Arrays.asList(
-    Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata)),
-    Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata)))
+    Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata, new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))),
+    Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata, registry = new MetricsRegistryMap(), systemAdmins = Map("kafka" -> new MockSystemAdmin))))
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ffd04d9d/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index 7fb70b2..5a04469 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -21,6 +21,8 @@ package org.apache.samza.system.chooser
 
 import org.apache.samza.Partition
 import org.apache.samza.config.{DefaultChooserConfig, MapConfig}
+import org.apache.samza.container.MockSystemAdmin
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition}
 import org.apache.samza.util.BlockingEnvelopeMap
@@ -66,7 +68,9 @@ class TestDefaultChooser {
         1 -> mock2),
       Map(
         envelope1.getSystemStreamPartition.getSystemStream -> streamMetadata,
-        envelope8.getSystemStreamPartition.getSystemStream -> stream3Metadata))
+        envelope8.getSystemStreamPartition.getSystemStream -> stream3Metadata),
+      new MetricsRegistryMap(),
+      Map("kafka" -> new MockSystemAdmin()))
 
     chooser.register(envelope1.getSystemStreamPartition, null)
     chooser.register(envelope2.getSystemStreamPartition, null)