You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/06/23 23:02:28 UTC

samza git commit: SAMZA-720: Fix BootstrapChooser hanging problem

Repository: samza
Updated Branches:
  refs/heads/master e2906c878 -> 99dbca19d


SAMZA-720: Fix BootstrapChooser hanging problem


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/99dbca19
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/99dbca19
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/99dbca19

Branch: refs/heads/master
Commit: 99dbca19dde7df0d2027059b57673536965659b5
Parents: e2906c8
Author: Yan Fang <ya...@gmail.com>
Authored: Tue Jun 23 14:02:17 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Tue Jun 23 14:02:17 2015 -0700

----------------------------------------------------------------------
 .../system/chooser/BootstrappingChooser.scala     | 11 +++++++++++
 .../system/chooser/TestBootstrappingChooser.scala | 18 ++++++++++++++++++
 2 files changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/99dbca19/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index dd500b9..1cd8e06 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -91,12 +91,21 @@ class BootstrappingChooser(
     .toSet
 
   /**
+   * Store all the systemStreamPartitions registered
+   */
+  var registeredSystemStreamPartitions = Set[SystemStreamPartition]()
+
+  /**
    * The number of lagging partitions that the underlying wrapped chooser has
    * been updated with, grouped by SystemStream.
    */
   var updatedSystemStreams = Map[SystemStream, Int]()
 
   def start = {
+    // remove the systemStreamPartitions not registered.
+    laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.contains(_))
+    systemStreamLagCounts = laggingSystemStreamPartitions.groupBy(_.getSystemStream).map {case (systemStream, ssps) => systemStream -> ssps.size}
+
     debug("Starting bootstrapping chooser with bootstrap metadata: %s" format bootstrapStreamMetadata)
     info("Got lagging partition counts for bootstrap streams: %s" format systemStreamLagCounts)
     metrics.setLaggingSystemStreams(() => laggingSystemStreamPartitions.size)
@@ -118,6 +127,8 @@ class BootstrappingChooser(
     checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
 
     wrapped.register(systemStreamPartition, offset)
+
+    registeredSystemStreamPartitions += systemStreamPartition
   }
 
   def update(envelope: IncomingMessageEnvelope) {

http://git-wip-us.apache.org/repos/asf/samza/blob/99dbca19/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index 3c2693c..2e0180d 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -180,6 +180,24 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     assertNull(chooser.choose)
     // Fin.
   }
+
+  @Test
+  def testChooserRegisteredCorrectSsps {
+    val mock = new MockMessageChooser
+    val metadata1 = getMetadata(envelope1, "123")
+    val metadata2 = getMetadata(envelope2, "321")
+    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2))
+
+    chooser.register(envelope1.getSystemStreamPartition, "1")
+    chooser.register(envelope2.getSystemStreamPartition, "1")
+    chooser.start
+
+    // it should only contain stream partition 0 and stream1 partition 1
+    val expectedLaggingSsps = Set(envelope1.getSystemStreamPartition, envelope2.getSystemStreamPartition)
+    assertEquals(expectedLaggingSsps, chooser.laggingSystemStreamPartitions)
+    val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1)
+    assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts)
+  }
 }
 
 object TestBootstrappingChooser {