You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/06/23 23:02:28 UTC
samza git commit: SAMZA-720: Fix BootstrapChooser hanging problem
Repository: samza
Updated Branches:
refs/heads/master e2906c878 -> 99dbca19d
SAMZA-720: Fix BootstrapChooser hanging problem
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/99dbca19
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/99dbca19
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/99dbca19
Branch: refs/heads/master
Commit: 99dbca19dde7df0d2027059b57673536965659b5
Parents: e2906c8
Author: Yan Fang <ya...@gmail.com>
Authored: Tue Jun 23 14:02:17 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Committed: Tue Jun 23 14:02:17 2015 -0700
----------------------------------------------------------------------
.../system/chooser/BootstrappingChooser.scala | 11 +++++++++++
.../system/chooser/TestBootstrappingChooser.scala | 18 ++++++++++++++++++
2 files changed, 29 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/99dbca19/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index dd500b9..1cd8e06 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -91,12 +91,21 @@ class BootstrappingChooser(
.toSet
/**
+ * Store all the systemStreamPartitions registered
+ */
+ var registeredSystemStreamPartitions = Set[SystemStreamPartition]()
+
+ /**
* The number of lagging partitions that the underlying wrapped chooser has
* been updated with, grouped by SystemStream.
*/
var updatedSystemStreams = Map[SystemStream, Int]()
def start = {
+ // remove the systemStreamPartitions not registered.
+ laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.contains(_))
+ systemStreamLagCounts = laggingSystemStreamPartitions.groupBy(_.getSystemStream).map {case (systemStream, ssps) => systemStream -> ssps.size}
+
debug("Starting bootstrapping chooser with bootstrap metadata: %s" format bootstrapStreamMetadata)
info("Got lagging partition counts for bootstrap streams: %s" format systemStreamLagCounts)
metrics.setLaggingSystemStreams(() => laggingSystemStreamPartitions.size)
@@ -118,6 +127,8 @@ class BootstrappingChooser(
checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
wrapped.register(systemStreamPartition, offset)
+
+ registeredSystemStreamPartitions += systemStreamPartition
}
def update(envelope: IncomingMessageEnvelope) {
http://git-wip-us.apache.org/repos/asf/samza/blob/99dbca19/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index 3c2693c..2e0180d 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -180,6 +180,24 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
assertNull(chooser.choose)
// Fin.
}
+
+ @Test
+ def testChooserRegisteredCorrectSsps {
+ val mock = new MockMessageChooser
+ val metadata1 = getMetadata(envelope1, "123")
+ val metadata2 = getMetadata(envelope2, "321")
+ val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2))
+
+ chooser.register(envelope1.getSystemStreamPartition, "1")
+ chooser.register(envelope2.getSystemStreamPartition, "1")
+ chooser.start
+
+ // it should only contain stream partition 0 and stream1 partition 1
+ val expectedLaggingSsps = Set(envelope1.getSystemStreamPartition, envelope2.getSystemStreamPartition)
+ assertEquals(expectedLaggingSsps, chooser.laggingSystemStreamPartitions)
+ val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1)
+ assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts)
+ }
}
object TestBootstrappingChooser {