You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/18 13:38:56 UTC

[GitHub] [beam] je-ik opened a new pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

je-ik opened a new pull request #14013:
URL: https://github.com/apache/beam/pull/14013


   The watemark reported by reader in trySplit needs to be stored and reported after call to trySplit.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-782309427


   > Hi Boyuan,
   > I think that it is incorrect to call `tracker.currentRestriction().getWatermark()` when tryClaim returns `false`. The returned value of `BoundedWindow.TIMESTAMP_MAX_VALUE` seems not to be the actual watermark. Why would we want to feed this value into the watermark estimator?
   > Regarding the hold - if we set the hold to TIMESTAMP_MAX_VALUE after restriction split, that could cause the output watermark to skip past the watermark of the residual restriction (watermark of the remaining data in the reader), which seems incorrect. That could cause emitting late data even though the reader's watermark is correct. Am I missing something?
   
   The reason why we want call `watermarkEstimator.setWatermark(currentRestriction.getWatermark());` when `tryClaim()` returns `false` is for tracking watermark when returning ProcessContinuation.resume(). It could happen when there is no output records from reader and we want to read again later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783209882


   > The reason why we want to call `watermarkEstimator.setWatermark(currentRestriction.getWatermark());` when `tryClaim()` returns `false` is for tracking watermark when returning ProcessContinuation.resume(). It could happen when there is no output records from reader and we want to read again later.
   
   That seems incorrect. When tryClaim returns `false`, it is part of the contract to return `ProcessContinuation.done()`:
   https://github.com/apache/beam/blob/aaad864c9acb22e35050f974a7ac74fb7638f085/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java#L221
   When the reader returns false() we do not fail clam, instead we go though `out[0] == null` in `processElement`.
   I think there should be no reason to enforce:
    a) returning ProcessContinuation.done(), and
    b) manually setting the watermark to BoundedWindow.TIMESTAMP_MAX_VALUE
   because, that seems redundant.
   
   You are right, that the watermark is read *before* call to trySplit (I overlooked that), that probably means, we *must* set watermark both *before* and *after* the tryClaim loop. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783596877


   > I'm afraid that setting watermark both before and after the tryClaim loop is still not enough. For most cases, every time tryClaim is called and there is one output record, the watermark should advance. If we only set watermark outside of tryClaim loop, we will still hold back the watermark somehow.
   
   Alright, that is true. I'd suggest we add watermark into the `UnboundedSourceValue`, I'd prefer to avoid materializing the `currentRestriction()` in the tryClaim loop. WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-785272283


   > @boyuanzz I have adjusted the PR.
   
   Thanks for the quick fix! I'll take a look later today.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik edited a comment on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
je-ik edited a comment on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-784963318


   @boyuanzz I have adjusted the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #14013:
URL: https://github.com/apache/beam/pull/14013#discussion_r582635836



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -789,6 +799,7 @@ public CheckpointMark getCheckpointMark() {
       private boolean readerHasBeenStarted;
       private Cache<Object, UnboundedReader<OutputT>> cachedReaders;
       private Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder;
+      private Instant lastPreSplitWatermark;

Review comment:
       Yes, you are right that it under current implementation probably plays no role. On the other hand, I'd like to avoid updating the watermark estimator to `BoundedWindow.TIMESTAMP_MAX_VALUE` (if that is not what actually came from the reader). Therefore, I'd propose to add method `isSplit` to `UnboundedSourceRestiction` and do not update the watermark estimator if that is the case (which then should result in returning `ProcessContinuation.done()`, which should be OK).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz edited a comment on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz edited a comment on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783572819


   > > The reason why we want to call `watermarkEstimator.setWatermark(currentRestriction.getWatermark());` when `tryClaim()` returns `false` is for tracking watermark when returning ProcessContinuation.resume(). It could happen when there is no output records from reader and we want to read again later.
   > 
   > That seems incorrect. When tryClaim returns `false`, it is part of the contract to return `ProcessContinuation.done()`:
   > https://github.com/apache/beam/blob/aaad864c9acb22e35050f974a7ac74fb7638f085/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java#L221
   > 
   > 
   > When the reader returns false() we do not fail clam, instead we go though `out[0] == null` in `processElement`.
   > I think there should be no reason to enforce:
   > a) returning ProcessContinuation.done(), and
   > b) manually setting the watermark to BoundedWindow.TIMESTAMP_MAX_VALUE
   > because, that seems redundant.
   
   Thanks, Jan! I think the unbounded wrapper does some hacks there but I'll double check. It would be great if we could change the implementation into a better one.
   
   > You are right, that the watermark is read _before_ call to trySplit (I overlooked that), that probably means, we _must_ set watermark both _before_ and _after_ the tryClaim loop.
   
   I'm afraid that setting watermark both _before_ and _after_ the tryClaim loop is still not enough. For most cases, every time `tryClaim` is called and there is one output record, the watermark should advance. If we only set watermark outside of `tryClaim` loop, we will still hold back the watermark somehow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #14013:
URL: https://github.com/apache/beam/pull/14013#discussion_r582541190



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -789,6 +799,7 @@ public CheckpointMark getCheckpointMark() {
       private boolean readerHasBeenStarted;
       private Cache<Object, UnboundedReader<OutputT>> cachedReaders;
       private Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder;
+      private Instant lastPreSplitWatermark;

Review comment:
       I think we can remove logics around `lastPreSplitWatermark` and update new added tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-781350001


   R: @lukecwik 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783599522


   Yes, I'll do the change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-781726100


   Hi Jan,
   
   You are right that that UnboundedSourceAsSDFWrapperDoFn doesn't update watermark correctly while tryClaim() is called. I think we should fix UnboundedSourceAsSDFWrapperDoFn instead of EmptyUnboundedSource.EmptyUnboundedReader. Here is my one line fix which does the same thing as yours: https://github.com/apache/beam/pull/14020. I really like your test so would you like to consider patching my fix into your PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-781576773


   Hi Jan, 
   
   From your changes it seems like you want to have `EmptyUnboundedSource.EmptyUnboundedReader` reports last watermark instead of `BoundedWindow.TIMESTAMP_MAX_VALUE`. But it's intended to have `EmptyUnboundedSource.EmptyUnboundedReader` to report `BoundedWindow.TIMESTAMP_MAX_VALUE` as watermark to mark current restriction has finished processing and should not hold watermark back.
   
   I checked discussion thread: https://lists.apache.org/thread.html/r3873ed7e3927c6fc81f6793af779e28cea9bc8f674ec89a7709bde31%40%3Cuser.beam.apache.org%3E  and in your last comment you are saying that `The problem is that under heavy load (i.e. when the UnboundedReader reads data without ever returning false in response to advance()) the watermark is not progressed correctly.`. Would you like to elaborate more on the relationship between this comment and this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz merged pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz merged pull request #14013:
URL: https://github.com/apache/beam/pull/14013


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-784963318


   @boyuanzz I have adjust the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783335600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=h1) Report
   > Merging [#14013](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=desc) (aaad864) into [master](https://codecov.io/gh/apache/beam/commit/c758a0d68d881df40e80f61691315b6f0222008f?el=desc) (c758a0d) will **increase** coverage by `0.02%`.
   > The diff coverage is `92.33%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/14013/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #14013      +/-   ##
   ==========================================
   + Coverage   82.95%   82.98%   +0.02%     
   ==========================================
     Files         469      469              
     Lines       58343    58298      -45     
   ==========================================
   - Hits        48401    48379      -22     
   + Misses       9942     9919      -23     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.95% <ø> (-0.04%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `79.61% <16.66%> (-0.39%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.03% <42.85%> (-1.65%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/pickler.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvcGlja2xlci5weQ==) | `86.86% <80.00%> (ø)` | |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `90.80% <88.05%> (-0.28%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/stats.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zdGF0cy5weQ==) | `97.33% <97.91%> (+10.61%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/frame\_base.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lX2Jhc2UucHk=) | `90.50% <100.00%> (-0.05%)` | :arrow_down: |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.92% <100.00%> (ø)` | |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `88.09% <0.00%> (-7.15%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `91.91% <0.00%> (-2.21%)` | :arrow_down: |
   | ... and [6 more](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=footer). Last update [c758a0d...070b575](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783335600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=h1) Report
   > Merging [#14013](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=desc) (aaad864) into [master](https://codecov.io/gh/apache/beam/commit/c758a0d68d881df40e80f61691315b6f0222008f?el=desc) (c758a0d) will **increase** coverage by `0.02%`.
   > The diff coverage is `92.33%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/14013/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #14013      +/-   ##
   ==========================================
   + Coverage   82.95%   82.98%   +0.02%     
   ==========================================
     Files         469      469              
     Lines       58343    58298      -45     
   ==========================================
   - Hits        48401    48379      -22     
   + Misses       9942     9919      -23     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.95% <ø> (-0.04%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `79.61% <16.66%> (-0.39%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.03% <42.85%> (-1.65%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/pickler.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvcGlja2xlci5weQ==) | `86.86% <80.00%> (ø)` | |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `90.80% <88.05%> (-0.28%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/stats.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zdGF0cy5weQ==) | `97.33% <97.91%> (+10.61%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/frame\_base.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lX2Jhc2UucHk=) | `90.50% <100.00%> (-0.05%)` | :arrow_down: |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.92% <100.00%> (ø)` | |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `88.09% <0.00%> (-7.15%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `91.91% <0.00%> (-2.21%)` | :arrow_down: |
   | ... and [6 more](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=footer). Last update [c758a0d...d167c88](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #14013:
URL: https://github.com/apache/beam/pull/14013#discussion_r583065052



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -789,6 +799,7 @@ public CheckpointMark getCheckpointMark() {
       private boolean readerHasBeenStarted;
       private Cache<Object, UnboundedReader<OutputT>> cachedReaders;
       private Coder<UnboundedSourceRestriction<OutputT, CheckpointT>> restrictionCoder;
+      private Instant lastPreSplitWatermark;

Review comment:
       That sounds good to me. Thanks for these changes!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz edited a comment on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz edited a comment on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-782309427


   > Hi Boyuan,
   > I think that it is incorrect to call `tracker.currentRestriction().getWatermark()` when tryClaim returns `false`. The returned value of `BoundedWindow.TIMESTAMP_MAX_VALUE` seems not to be the actual watermark. Why would we want to feed this value into the watermark estimator?
   > Regarding the hold - if we set the hold to TIMESTAMP_MAX_VALUE after restriction split, that could cause the output watermark to skip past the watermark of the residual restriction (watermark of the remaining data in the reader), which seems incorrect. That could cause emitting late data even though the reader's watermark is correct. Am I missing something?
   
   The reason why we want to call `watermarkEstimator.setWatermark(currentRestriction.getWatermark());` when `tryClaim()` returns `false` is for tracking watermark when returning ProcessContinuation.resume(). It could happen when there is no output records from reader and we want to read again later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783335600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=h1) Report
   > Merging [#14013](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=desc) (aaad864) into [master](https://codecov.io/gh/apache/beam/commit/c758a0d68d881df40e80f61691315b6f0222008f?el=desc) (c758a0d) will **increase** coverage by `0.02%`.
   > The diff coverage is `92.33%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/14013/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #14013      +/-   ##
   ==========================================
   + Coverage   82.95%   82.98%   +0.02%     
   ==========================================
     Files         469      469              
     Lines       58343    58298      -45     
   ==========================================
   - Hits        48401    48379      -22     
   + Misses       9942     9919      -23     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.95% <ø> (-0.04%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `79.61% <16.66%> (-0.39%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.03% <42.85%> (-1.65%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/pickler.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvcGlja2xlci5weQ==) | `86.86% <80.00%> (ø)` | |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `90.80% <88.05%> (-0.28%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/stats.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zdGF0cy5weQ==) | `97.33% <97.91%> (+10.61%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/frame\_base.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lX2Jhc2UucHk=) | `90.50% <100.00%> (-0.05%)` | :arrow_down: |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.92% <100.00%> (ø)` | |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `88.09% <0.00%> (-7.15%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `91.91% <0.00%> (-2.21%)` | :arrow_down: |
   | ... and [6 more](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=footer). Last update [c758a0d...9587b00](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783598412


   > > I'm afraid that setting watermark both before and after the tryClaim loop is still not enough. For most cases, every time tryClaim is called and there is one output record, the watermark should advance. If we only set watermark outside of tryClaim loop, we will still hold back the watermark somehow.
   > 
   > Alright, that is true. I'd suggest we add watermark into the `UnboundedSourceValue`, I'd prefer to avoid materializing the `currentRestriction()` in the tryClaim loop. WDYT?
   
   That sounds good to me. Would you like to make the change in this PR together? Meanwhile I'll double check the implementation to ensure we don't introduce additional bugs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783335600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=h1) Report
   > Merging [#14013](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=desc) (aaad864) into [master](https://codecov.io/gh/apache/beam/commit/c758a0d68d881df40e80f61691315b6f0222008f?el=desc) (c758a0d) will **increase** coverage by `0.02%`.
   > The diff coverage is `92.33%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/14013/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #14013      +/-   ##
   ==========================================
   + Coverage   82.95%   82.98%   +0.02%     
   ==========================================
     Files         469      469              
     Lines       58343    58298      -45     
   ==========================================
   - Hits        48401    48379      -22     
   + Misses       9942     9919      -23     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.95% <ø> (-0.04%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `79.61% <16.66%> (-0.39%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.03% <42.85%> (-1.65%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/pickler.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvcGlja2xlci5weQ==) | `86.86% <80.00%> (ø)` | |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `90.80% <88.05%> (-0.28%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/stats.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zdGF0cy5weQ==) | `97.33% <97.91%> (+10.61%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/frame\_base.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lX2Jhc2UucHk=) | `90.50% <100.00%> (-0.05%)` | :arrow_down: |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.92% <100.00%> (ø)` | |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `88.09% <0.00%> (-7.15%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `91.91% <0.00%> (-2.21%)` | :arrow_down: |
   | ... and [6 more](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=footer). Last update [c758a0d...cceea71](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz edited a comment on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz edited a comment on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-781726100


   Hi Jan,
   
   You are right that that UnboundedSourceAsSDFWrapperDoFn doesn't update watermark correctly while tryClaim() is called. I think we should fix UnboundedSourceAsSDFWrapperDoFn instead of EmptyUnboundedSource.EmptyUnboundedReader. Here is my one line fix which does the same thing as yours: https://github.com/apache/beam/pull/14020. I really like your test so would you like to consider patching my fix into your PR if my fix makes sense to you?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783220555


   Commit https://github.com/apache/beam/pull/14013/commits/000726ebfae521c8078c0ed2fb7c2945412ca735 should fix for that


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783573725


   cc: @y1chi fyi


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783347335


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783335600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=h1) Report
   > Merging [#14013](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=desc) (aaad864) into [master](https://codecov.io/gh/apache/beam/commit/c758a0d68d881df40e80f61691315b6f0222008f?el=desc) (c758a0d) will **increase** coverage by `0.02%`.
   > The diff coverage is `92.33%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/14013/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #14013      +/-   ##
   ==========================================
   + Coverage   82.95%   82.98%   +0.02%     
   ==========================================
     Files         469      469              
     Lines       58343    58298      -45     
   ==========================================
   - Hits        48401    48379      -22     
   + Misses       9942     9919      -23     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.95% <ø> (-0.04%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `79.61% <16.66%> (-0.39%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.03% <42.85%> (-1.65%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/pickler.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvcGlja2xlci5weQ==) | `86.86% <80.00%> (ø)` | |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `90.80% <88.05%> (-0.28%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/stats.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zdGF0cy5weQ==) | `97.33% <97.91%> (+10.61%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/frame\_base.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lX2Jhc2UucHk=) | `90.50% <100.00%> (-0.05%)` | :arrow_down: |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.92% <100.00%> (ø)` | |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `88.09% <0.00%> (-7.15%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `91.91% <0.00%> (-2.21%)` | :arrow_down: |
   | ... and [6 more](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=footer). Last update [c758a0d...9587b00](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-781895279


   Hi Boyuan,
   I think that it is incorrect to call `tracker.currentRestriction().getWatermark()` when tryClaim returns `false`. The returned value of `BoundedWindow.TIMESTAMP_MAX_VALUE` seems not to be the actual watermark. Why would we want to feed this value into the watermark estimator?
   Regarding the hold - if we set the hold to TIMESTAMP_MAX_VALUE after restriction split, that could cause the output watermark to skip past the watermark of the residual restriction (watermark of the remaining data in the reader), which seems incorrect. That could cause emitting late data even though the reader's watermark is correct. Am I missing something?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783335600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=h1) Report
   > Merging [#14013](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=desc) (aaad864) into [master](https://codecov.io/gh/apache/beam/commit/c758a0d68d881df40e80f61691315b6f0222008f?el=desc) (c758a0d) will **increase** coverage by `0.02%`.
   > The diff coverage is `92.33%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/14013/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #14013      +/-   ##
   ==========================================
   + Coverage   82.95%   82.98%   +0.02%     
   ==========================================
     Files         469      469              
     Lines       58343    58298      -45     
   ==========================================
   - Hits        48401    48379      -22     
   + Misses       9942     9919      -23     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.95% <ø> (-0.04%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `79.61% <16.66%> (-0.39%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.03% <42.85%> (-1.65%)` | :arrow_down: |
   | [sdks/python/apache\_beam/internal/pickler.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvcGlja2xlci5weQ==) | `86.86% <80.00%> (ø)` | |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `90.80% <88.05%> (-0.28%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/stats.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9zdGF0cy5weQ==) | `97.33% <97.91%> (+10.61%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/frame\_base.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lX2Jhc2UucHk=) | `90.50% <100.00%> (-0.05%)` | :arrow_down: |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.92% <100.00%> (ø)` | |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `88.09% <0.00%> (-7.15%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `91.91% <0.00%> (-2.21%)` | :arrow_down: |
   | ... and [6 more](https://codecov.io/gh/apache/beam/pull/14013/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=footer). Last update [c758a0d...db6dd9b](https://codecov.io/gh/apache/beam/pull/14013?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #14013: [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #14013:
URL: https://github.com/apache/beam/pull/14013#issuecomment-783572819


   > > The reason why we want to call `watermarkEstimator.setWatermark(currentRestriction.getWatermark());` when `tryClaim()` returns `false` is for tracking watermark when returning ProcessContinuation.resume(). It could happen when there is no output records from reader and we want to read again later.
   > 
   > That seems incorrect. When tryClaim returns `false`, it is part of the contract to return `ProcessContinuation.done()`:
   > https://github.com/apache/beam/blob/aaad864c9acb22e35050f974a7ac74fb7638f085/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java#L221
   > 
   > 
   > When the reader returns false() we do not fail clam, instead we go though `out[0] == null` in `processElement`.
   > I think there should be no reason to enforce:
   > a) returning ProcessContinuation.done(), and
   > b) manually setting the watermark to BoundedWindow.TIMESTAMP_MAX_VALUE
   > because, that seems redundant.
   Thanks, Jan! I think the unbounded wrapper does some hacks there but I'll double check. It would be great if we could change the implementation into a better one.
   
   > You are right, that the watermark is read _before_ call to trySplit (I overlooked that), that probably means, we _must_ set watermark both _before_ and _after_ the tryClaim loop.
   I'm afraid that setting watermark both _before_ and _after_ the tryClaim loop is still not enough. For most cases, every time `tryClaim` is called and there is one output record, the watermark should advance. If we only set watermark outside of `tryClaim` loop, we will still hold back the watermark somehow.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org