You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/07/27 00:18:00 UTC

samza git commit: Handle end of stream in BootstrapChooser

Repository: samza
Updated Branches:
  refs/heads/master c4dfbd347 -> 43c36e6f2


Handle end of stream in BootstrapChooser

Handle end of stream envelopes in bootstrap chooser and don't invoke check offsets since offset of end of stream are not comparable.

Author: Bharath Kumarasubramanian <bk...@linkedin.com>

Reviewers: Xinyu Liu <xi...@apache.org>

Closes #582 from bharathkk/end-of-stream


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/43c36e6f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/43c36e6f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/43c36e6f

Branch: refs/heads/master
Commit: 43c36e6f2e4c51c916011a8a35830eb141b3567f
Parents: c4dfbd3
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Thu Jul 26 17:17:51 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Thu Jul 26 17:17:51 2018 -0700

----------------------------------------------------------------------
 .../system/chooser/BootstrappingChooser.scala   | 35 ++++++++++++--------
 1 file changed, 22 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/43c36e6f/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index f50f27d..535fb91 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -197,10 +197,14 @@ class BootstrappingChooser(
 
           updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) - 1)
 
-          // If the offset we just read is the same as the offset for the last
-          // message (newest) in this system stream partition, then we have read
-          // all messages, and can mark this SSP as bootstrapped.
-          checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+          if (envelope.isEndOfStream) {
+            markSspAsCaughtUp(systemStreamPartition)
+          } else {
+            // If the offset we just read is the same as the offset for the last
+            // message (newest) in this system stream partition, then we have read
+            // all messages, and can mark this SSP as bootstrapped.
+            checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+          }
         }
       }
 
@@ -277,18 +281,23 @@ class BootstrappingChooser(
     // The SSP is no longer lagging if the envelope's offset is greater than or equal to the
     // latest offset.
     if (comparatorResult != null && comparatorResult.intValue() >= 0) {
-      laggingSystemStreamPartitions -= systemStreamPartition
-      systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)
+      markSspAsCaughtUp(systemStreamPartition)
+    }
+  }
 
-      debug("Bootstrap stream partition is fully caught up: %s" format systemStreamPartition)
+  private def markSspAsCaughtUp(systemStreamPartition: SystemStreamPartition) = {
+    val systemStream: SystemStream = systemStreamPartition.getSystemStream
+    laggingSystemStreamPartitions -= systemStreamPartition
+    systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)
 
-      if (systemStreamLagCounts(systemStream) == 0) {
-        info("Bootstrap stream is fully caught up: %s" format systemStream)
+    debug("Bootstrap stream partition is fully caught up: %s" format systemStreamPartition)
 
-        // If the lag count is 0, then no partition for this stream is lagging
-        // (the stream has been fully caught up).
-        systemStreamLagCounts -= systemStream
-      }
+    if (systemStreamLagCounts(systemStream) == 0) {
+      info("Bootstrap stream is fully caught up: %s" format systemStream)
+
+      // If the lag count is 0, then no partition for this stream is lagging
+      // (the stream has been fully caught up).
+      systemStreamLagCounts -= systemStream
     }
   }