You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/05/30 01:48:05 UTC

samza git commit: SAMZA-1728: BootstrappingChooser - Call checkOffset only for a lagging partition while choosing.

Repository: samza
Updated Branches:
  refs/heads/master 9f323c950 -> 4187ba953


SAMZA-1728: BootstrappingChooser - Call checkOffset only for a lagging partition while choosing.

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #533 from atoomula/chooser


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4187ba95
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4187ba95
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4187ba95

Branch: refs/heads/master
Commit: 4187ba9535da5c88a9e22754927da58600b48447
Parents: 9f323c9
Author: Aditya Toomula <at...@linkedin.com>
Authored: Tue May 29 18:48:02 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue May 29 18:48:02 2018 -0700

----------------------------------------------------------------------
 .../system/chooser/BootstrappingChooser.scala   | 10 ++--
 .../chooser/TestBootstrappingChooser.scala      | 62 ++++++++++++++++++--
 2 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4187ba95/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index 38e2cfa..f50f27d 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -196,12 +196,12 @@ class BootstrappingChooser(
           val systemStream = systemStreamPartition.getSystemStream
 
           updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) - 1)
-        }
 
-        // If the offset we just read is the same as the offset for the last
-        // message (newest) in this system stream partition, then we have read
-        // all messages, and can mark this SSP as bootstrapped.
-        checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+          // If the offset we just read is the same as the offset for the last
+          // message (newest) in this system stream partition, then we have read
+          // all messages, and can mark this SSP as bootstrapped.
+          checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+        }
       }
 
       envelope

http://git-wip-us.apache.org/repos/asf/samza/blob/4187ba95/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index a017518..1a99355 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -37,10 +37,14 @@ import scala.collection.JavaConverters._
 
 @RunWith(value = classOf[Parameterized])
 class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
-  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1);
-  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2);
-  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3);
-  val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
+  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1)
+  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2)
+  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3)
+  val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4)
+  val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "124", null, 5)
+  val envelope6 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "125", null, 6)
+  val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "124", null, 7)
+  val envelope8 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "125", null, 8)
 
   /**
    * Helper function to create metadata for a single envelope with a single offset.
@@ -202,6 +206,56 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
   }
 
   @Test
+  def testChooserShouldHaveNoLaggingSspsAfterCaughtUp {
+    val mockMessageChooser = new MockMessageChooser
+    val sspMetadataMap =
+      Map(envelope3.getSystemStreamPartition.getPartition -> new SystemStreamPartitionMetadata(null, "123", null),
+      envelope2.getSystemStreamPartition.getPartition -> new SystemStreamPartitionMetadata(null, "123", null))
+    val metadata = new SystemStreamMetadata(
+      envelope3.getSystemStreamPartition.getStream,
+      sspMetadataMap.asJava)
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("kafka")).thenReturn(new MockSystemAdmin)
+    val chooser = new BootstrappingChooser(mockMessageChooser, Map(envelope2.getSystemStreamPartition.getSystemStream -> metadata),
+      new BootstrappingChooserMetrics(), systemAdmins)
+
+    chooser.register(envelope2.getSystemStreamPartition, "1")
+    chooser.register(envelope3.getSystemStreamPartition, "1")
+    chooser.start
+
+    // There should be 2 lagging partitions
+    assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 2), chooser.systemStreamLagCounts)
+
+    assertNull(chooser.choose)
+    chooser.update(envelope5) // ssp1 is now marked as not lagging
+    assertEquals(envelope5, chooser.choose)
+
+    // There should be 1 lagging partition
+    assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 1), chooser.systemStreamLagCounts)
+
+    // Update with one more envelope from ssp1 and make sure that systemStreamLagCounts is still 1
+    chooser.update(envelope6)
+    assertEquals(null, chooser.choose) // no events are expected to be chosen from ssp1 until lagging ssp0 has envelopes
+
+    chooser.update(envelope3)
+    assertEquals(envelope6, chooser.choose)
+    assertEquals(envelope3, chooser.choose)
+
+    // There should still be 1 lagging partition
+    assertEquals(Map(envelope2.getSystemStreamPartition.getSystemStream -> 1), chooser.systemStreamLagCounts)
+
+    chooser.update(envelope7)
+    assertEquals(envelope7, chooser.choose)  // ssp0 is now marked as not lagging
+
+    // chooser should not have any lagging partitions
+    assertTrue(chooser.laggingSystemStreamPartitions.isEmpty)
+    assertTrue(chooser.systemStreamLagCounts.isEmpty)
+
+    chooser.update(envelope8)
+    assertEquals(envelope8, chooser.choose)
+  }
+
+  @Test
   def testChooserRegisterWithStreamUsedAsBootstrapAndBroadcast: Unit = {
     val mockMessageChooser = new MockMessageChooser
     val metadata1 = getMetadata(envelope1, "123")