You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/05/30 01:48:05 UTC
samza git commit: SAMZA-1728: BootstrappingChooser - Call checkOffset
only for a lagging partition while choosing.
Repository: samza
Updated Branches:
refs/heads/master 9f323c950 -> 4187ba953
SAMZA-1728: BootstrappingChooser - Call checkOffset only for a lagging partition while choosing.
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #533 from atoomula/chooser
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4187ba95
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4187ba95
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4187ba95
Branch: refs/heads/master
Commit: 4187ba9535da5c88a9e22754927da58600b48447
Parents: 9f323c9
Author: Aditya Toomula <at...@linkedin.com>
Authored: Tue May 29 18:48:02 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue May 29 18:48:02 2018 -0700
----------------------------------------------------------------------
.../system/chooser/BootstrappingChooser.scala | 10 ++--
.../chooser/TestBootstrappingChooser.scala | 62 ++++++++++++++++++--
2 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4187ba95/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index 38e2cfa..f50f27d 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -196,12 +196,12 @@ class BootstrappingChooser(
val systemStream = systemStreamPartition.getSystemStream
updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) - 1)
- }
- // If the offset we just read is the same as the offset for the last
- // message (newest) in this system stream partition, then we have read
- // all messages, and can mark this SSP as bootstrapped.
- checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+ // If the offset we just read is the same as the offset for the last
+ // message (newest) in this system stream partition, then we have read
+ // all messages, and can mark this SSP as bootstrapped.
+ checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+ }
}
envelope
http://git-wip-us.apache.org/repos/asf/samza/blob/4187ba95/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index a017518..1a99355 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -37,10 +37,14 @@ import scala.collection.JavaConverters._
@RunWith(value = classOf[Parameterized])
class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
- val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1);
- val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2);
- val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3);
- val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
+ val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1)
+ val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2)
+ val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3)
+ val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4)
+ val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "124", null, 5)
+ val envelope6 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "125", null, 6)
+ val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "124", null, 7)
+ val envelope8 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "125", null, 8)
/**
* Helper function to create metadata for a single envelope with a single offset.
@@ -202,6 +206,56 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
}
@Test
+ def testChooserShouldHaveNoLaggingSspsAfterCaughtUp {
+ val mockMessageChooser = new MockMessageChooser
+ val sspMetadataMap =
+ Map(envelope3.getSystemStreamPartition.getPartition -> new SystemStreamPartitionMetadata(null, "123", null),
+ envelope2.getSystemStreamPartition.getPartition -> new SystemStreamPartitionMetadata(null, "123", null))
+ val metadata = new SystemStreamMetadata(
+ envelope3.getSystemStreamPartition.getStream,
+ sspMetadataMap.asJava)
+ val systemAdmins = mock(classOf[SystemAdmins])
+ when(systemAdmins.getSystemAdmin("kafka")).thenReturn(new MockSystemAdmin)
+ val chooser = new BootstrappingChooser(mockMessageChooser, Map(envelope2.getSystemStreamPartition.getSystemStream -> metadata),
+ new BootstrappingChooserMetrics(), systemAdmins)
+
+ chooser.register(envelope2.getSystemStreamPartition, "1")
+ chooser.register(envelope3.getSystemStreamPartition, "1")
+ chooser.start
+
+ // There should be 2 lagging partitions
+ assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 2), chooser.systemStreamLagCounts)
+
+ assertNull(chooser.choose)
+ chooser.update(envelope5) // ssp1 is now marked as not lagging
+ assertEquals(envelope5, chooser.choose)
+
+ // There should be 1 lagging partition
+ assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 1), chooser.systemStreamLagCounts)
+
+ // Update with one more envelope from ssp1 and make sure that systemStreamLagCounts is still 1
+ chooser.update(envelope6)
+ assertEquals(null, chooser.choose) // no events are expected to be chosen from ssp1 until lagging ssp0 has envelopes
+
+ chooser.update(envelope3)
+ assertEquals(envelope6, chooser.choose)
+ assertEquals(envelope3, chooser.choose)
+
+ // There should still be 1 lagging partition
+ assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 1), chooser.systemStreamLagCounts)
+
+ chooser.update(envelope7)
+ assertEquals(envelope7, chooser.choose) // ssp0 is now marked as not lagging
+
+ // chooser should not have any lagging partitions
+ assertTrue(chooser.laggingSystemStreamPartitions.isEmpty)
+ assertTrue(chooser.systemStreamLagCounts.isEmpty)
+
+ chooser.update(envelope8)
+ assertEquals(envelope8, chooser.choose)
+ }
+
+ @Test
def testChooserRegisterWithStreamUsedAsBootstrapAndBroadcast: Unit = {
val mockMessageChooser = new MockMessageChooser
val metadata1 = getMetadata(envelope1, "123")