You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/14 00:43:44 UTC

samza git commit: SAMZA-1611: BootstrappingChooser should use systemAdmin offsetComparator API to compare the offsets

Repository: samza
Updated Branches:
  refs/heads/master 49e5073c3 -> 3f9b96704


SAMZA-1611: BootstrappingChooser should use systemAdmin offsetComparator API to compare the offsets

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #443 from atoomula/bootstrap


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3f9b9670
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3f9b9670
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3f9b9670

Branch: refs/heads/master
Commit: 3f9b967041e4108c0071033a73f2595e56ed3e0b
Parents: 49e5073
Author: Aditya Toomula <at...@linkedin.com>
Authored: Tue Mar 13 17:43:45 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Mar 13 17:43:45 2018 -0700

----------------------------------------------------------------------
 .../samza/system/chooser/BootstrappingChooser.scala     | 12 ++++++++++--
 .../samza/system/chooser/TestBootstrappingChooser.scala |  6 +++---
 .../samza/system/chooser/TestDefaultChooser.scala       | 10 +++++-----
 3 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3f9b9670/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index 212ec05..ffc0c90 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -266,9 +266,17 @@ class BootstrappingChooser(
 
     trace("Check %s offset %s against %s for %s." format (offsetType, offset, offsetToCheck, systemStreamPartition))
 
-    // The SSP is no longer lagging if the envelope's offset equals the
+    // Let's compare offset of the chosen message with offsetToCheck.
+    val comparatorResult: Integer = if (offset == null || offsetToCheck == null) {
+      -1
+    } else {
+      val systemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
+      systemAdmin.offsetComparator(offset, offsetToCheck)
+    }
+
+    // The SSP is no longer lagging if the envelope's offset is greater than or equal to the
     // latest offset.
-    if (offset != null && offset.equals(offsetToCheck)) {
+    if (comparatorResult != null && comparatorResult.intValue() >= 0) {
       laggingSystemStreamPartitions -= systemStreamPartition
       systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3f9b9670/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index e56206a..02791bb 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -36,9 +36,9 @@ import scala.collection.JavaConverters._
 
 @RunWith(value = classOf[Parameterized])
 class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
-  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
-  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
-  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3);
+  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1);
+  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2);
+  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3);
   val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/3f9b9670/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index df5282c..e21bd9c 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -32,13 +32,13 @@ import org.junit.Test
 import scala.collection.JavaConverters._
 
 class TestDefaultChooser {
-  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
-  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
-  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream2", new Partition(0)), null, null, 3);
+  val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1);
+  val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2);
+  val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream2", new Partition(0)), "122", null, 3);
   val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
-  val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), null, null, 5);
+  val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), "320", null, 5);
   val envelope6 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), "321", null, 6);
-  val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 7);
+  val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "653", null, 7);
   val envelope8 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream3", new Partition(0)), "654", null, 8);
 
   @Test