You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/14 00:43:44 UTC
samza git commit: SAMZA-1611: BootstrappingChooser should use
systemAdmin offsetComparator API to compare the offsets
Repository: samza
Updated Branches:
refs/heads/master 49e5073c3 -> 3f9b96704
SAMZA-1611: BootstrappingChooser should use systemAdmin offsetComparator API to compare the offsets
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #443 from atoomula/bootstrap
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3f9b9670
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3f9b9670
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3f9b9670
Branch: refs/heads/master
Commit: 3f9b967041e4108c0071033a73f2595e56ed3e0b
Parents: 49e5073
Author: Aditya Toomula <at...@linkedin.com>
Authored: Tue Mar 13 17:43:45 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Mar 13 17:43:45 2018 -0700
----------------------------------------------------------------------
.../samza/system/chooser/BootstrappingChooser.scala | 12 ++++++++++--
.../samza/system/chooser/TestBootstrappingChooser.scala | 6 +++---
.../samza/system/chooser/TestDefaultChooser.scala | 10 +++++-----
3 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/3f9b9670/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index 212ec05..ffc0c90 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -266,9 +266,17 @@ class BootstrappingChooser(
trace("Check %s offset %s against %s for %s." format (offsetType, offset, offsetToCheck, systemStreamPartition))
- // The SSP is no longer lagging if the envelope's offset equals the
+ // Let's compare offset of the chosen message with offsetToCheck.
+ val comparatorResult: Integer = if (offset == null || offsetToCheck == null) {
+ -1
+ } else {
+ val systemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
+ systemAdmin.offsetComparator(offset, offsetToCheck)
+ }
+
+ // The SSP is no longer lagging if the envelope's offset is greater than or equal to the
// latest offset.
- if (offset != null && offset.equals(offsetToCheck)) {
+ if (comparatorResult != null && comparatorResult.intValue() >= 0) {
laggingSystemStreamPartitions -= systemStreamPartition
systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)
http://git-wip-us.apache.org/repos/asf/samza/blob/3f9b9670/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index e56206a..02791bb 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -36,9 +36,9 @@ import scala.collection.JavaConverters._
@RunWith(value = classOf[Parameterized])
class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
- val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
- val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
- val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 3);
+ val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1);
+ val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2);
+ val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "122", null, 3);
val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/3f9b9670/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index df5282c..e21bd9c 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -32,13 +32,13 @@ import org.junit.Test
import scala.collection.JavaConverters._
class TestDefaultChooser {
- val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
- val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), null, null, 2);
- val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream2", new Partition(0)), null, null, 3);
+ val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "120", null, 1);
+ val envelope2 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(1)), "121", null, 2);
+ val envelope3 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream2", new Partition(0)), "122", null, 3);
val envelope4 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), "123", null, 4);
- val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), null, null, 5);
+ val envelope5 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), "320", null, 5);
val envelope6 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(1)), "321", null, 6);
- val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), null, null, 7);
+ val envelope7 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream1", new Partition(0)), "653", null, 7);
val envelope8 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream3", new Partition(0)), "654", null, 8);
@Test