You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/19 19:47:20 UTC

[GitHub] [beam] boyuanzz commented on a change in pull request #14020: UnboundedSourceAsSDFWrapper should update WatermarkEstimator with current watermark every time tryCliam is called

boyuanzz commented on a change in pull request #14020:
URL: https://github.com/apache/beam/pull/14020#discussion_r579435256



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -537,6 +537,7 @@ public ProcessContinuation processElement(
       while (tracker.tryClaim(out) && out[0] != null) {
         receiver.outputWithTimestamp(
             new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp());
+        watermarkEstimator.setWatermark(tracker.currentRestriction().getWatermark());

Review comment:
       >  I think we should leave the call to setWatemark only outside of this while loop.
   
   The tricky part is that when we exit `tryClaim()` loop, it's either trySplit() has been called or when source reader return false from advance(). When trySplit() is called, the currentRestriction will be changed to `EmptyUnboundedSource `. So output of the `tryClaim()` loo, the watermark we get from `currentRestriction` is actually from `EmptyUnboundedSource`, which is MAX_TIMESTAMP.
   
   Besides, the SDK gets watermark for `DelayedBundleApplication` from `WatermarkEstimator` before we call trySplit[`]. So we actually need to update the WatermarkEstimator with correct watermark every time we do tryClaim.
   [1] https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java#L330-L331

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -537,6 +537,7 @@ public ProcessContinuation processElement(
       while (tracker.tryClaim(out) && out[0] != null) {
         receiver.outputWithTimestamp(
             new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp());
+        watermarkEstimator.setWatermark(tracker.currentRestriction().getWatermark());

Review comment:
       >  I think we should leave the call to setWatemark only outside of this while loop.
   
   The tricky part is that when we exit `tryClaim()` loop, it's either trySplit() has been called or when source reader return false from advance(). When trySplit() is called, the currentRestriction will be changed to `EmptyUnboundedSource `. So output of the `tryClaim()` loo, the watermark we get from `currentRestriction` is actually from `EmptyUnboundedSource`, which is MAX_TIMESTAMP.
   
   Besides, the SDK gets watermark for `DelayedBundleApplication` from `WatermarkEstimator` before we call trySplit[1]. So we actually need to update the WatermarkEstimator with correct watermark every time we do tryClaim.
   [1] https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java#L330-L331




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org