You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/07/27 00:18:00 UTC
samza git commit: Handle end of stream in BootstrapChooser
Repository: samza
Updated Branches:
refs/heads/master c4dfbd347 -> 43c36e6f2
Handle end of stream in BootstrapChooser
Handle end of stream envelopes in bootstrap chooser and don't invoke check offsets since offset of end of stream are not comparable.
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Reviewers: Xinyu Liu <xi...@apache.org>
Closes #582 from bharathkk/end-of-stream
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/43c36e6f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/43c36e6f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/43c36e6f
Branch: refs/heads/master
Commit: 43c36e6f2e4c51c916011a8a35830eb141b3567f
Parents: c4dfbd3
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Thu Jul 26 17:17:51 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Thu Jul 26 17:17:51 2018 -0700
----------------------------------------------------------------------
.../system/chooser/BootstrappingChooser.scala | 35 ++++++++++++--------
1 file changed, 22 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/43c36e6f/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index f50f27d..535fb91 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -197,10 +197,14 @@ class BootstrappingChooser(
updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) - 1)
- // If the offset we just read is the same as the offset for the last
- // message (newest) in this system stream partition, then we have read
- // all messages, and can mark this SSP as bootstrapped.
- checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+ if (envelope.isEndOfStream) {
+ markSspAsCaughtUp(systemStreamPartition)
+ } else {
+ // If the offset we just read is the same as the offset for the last
+ // message (newest) in this system stream partition, then we have read
+ // all messages, and can mark this SSP as bootstrapped.
+ checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
+ }
}
}
@@ -277,18 +281,23 @@ class BootstrappingChooser(
// The SSP is no longer lagging if the envelope's offset is greater than or equal to the
// latest offset.
if (comparatorResult != null && comparatorResult.intValue() >= 0) {
- laggingSystemStreamPartitions -= systemStreamPartition
- systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)
+ markSspAsCaughtUp(systemStreamPartition)
+ }
+ }
- debug("Bootstrap stream partition is fully caught up: %s" format systemStreamPartition)
+ private def markSspAsCaughtUp(systemStreamPartition: SystemStreamPartition) = {
+ val systemStream: SystemStream = systemStreamPartition.getSystemStream
+ laggingSystemStreamPartitions -= systemStreamPartition
+ systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)
- if (systemStreamLagCounts(systemStream) == 0) {
- info("Bootstrap stream is fully caught up: %s" format systemStream)
+ debug("Bootstrap stream partition is fully caught up: %s" format systemStreamPartition)
- // If the lag count is 0, then no partition for this stream is lagging
- // (the stream has been fully caught up).
- systemStreamLagCounts -= systemStream
- }
+ if (systemStreamLagCounts(systemStream) == 0) {
+ info("Bootstrap stream is fully caught up: %s" format systemStream)
+
+ // If the lag count is 0, then no partition for this stream is lagging
+ // (the stream has been fully caught up).
+ systemStreamLagCounts -= systemStream
}
}