You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/06 22:50:51 UTC

[GitHub] [beam] chamikaramj opened a new pull request #11607: [BEAM-9430] Makes sure the watermarks returned by estimators are within bounds

chamikaramj opened a new pull request #11607:
URL: https://github.com/apache/beam/pull/11607


   Updates the watermark estimators to make sure the returned watermark is within bounds.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
ihji commented on pull request #11607:
URL: https://github.com/apache/beam/pull/11607#issuecomment-628326914


   I can confirm that this PR fixes the issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r424831998



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
##########
@@ -44,4 +45,19 @@
    * <p>The state returned must not be mutated.
    */
   WatermarkEstimatorStateT getState();
+
+  /**
+   * Validates that a given watermark is within timestamp min and max bounds.
+   *
+   * @param watermark watermark to validate
+   */
+  static void ensureWatermarkWithinBounds(Instant watermark) {

Review comment:
       It makes more sense to add this method to BoundedWindow as it should apply to a lot more places then just WatermarkEstimators.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r422236167



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
##########
@@ -37,14 +37,16 @@
     private Instant lastReportedWatermark;
 
     public Manual(Instant watermark) {
-      this.watermark = checkNotNull(watermark, "watermark must not be null.");
-      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
-          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Provided watermark %s must be within bounds [%s, %s].",
-                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.TIMESTAMP_MAX_VALUE));
+      checkNotNull(watermark, "watermark must not be null.");
+
+      // Making sure that the watermark is within bounds.

Review comment:
       Your right, it would be good to migrate to use BoundedWindow as the import for the static though.
   
   I think it makes sense to make the constructor validate the bounds and have setWatermark ensure that the value is within the range as expected. We can fix the UnboundedSource SDF wrapper to clamp the watermark value that is being reported from UnboundedReader instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #11607: [BEAM-9430] Makes sure the watermarks returned by estimators are within bounds

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #11607:
URL: https://github.com/apache/beam/pull/11607#issuecomment-624161195


   Closing this temporarily while I look into this further.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r425283694



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
##########
@@ -44,4 +45,19 @@
    * <p>The state returned must not be mutated.
    */
   WatermarkEstimatorStateT getState();
+
+  /**
+   * Validates that a given watermark is within timestamp min and max bounds.
+   *
+   * @param watermark watermark to validate
+   */
+  static void ensureWatermarkWithinBounds(Instant watermark) {

Review comment:
       Done.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -537,6 +538,12 @@ public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentEleme
     @NewWatermarkEstimator
     public WatermarkEstimators.Manual newWatermarkEstimator(
         @WatermarkEstimatorState Instant watermarkEstimatorState) {
+      // Making sure that the watermark is within bounds.
+      if (watermarkEstimatorState.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+        watermarkEstimatorState = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      } else if (watermarkEstimatorState.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+        watermarkEstimatorState = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      }

Review comment:
       Added to both.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r421827443



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
##########
@@ -37,14 +37,16 @@
     private Instant lastReportedWatermark;
 
     public Manual(Instant watermark) {
-      this.watermark = checkNotNull(watermark, "watermark must not be null.");
-      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
-          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Provided watermark %s must be within bounds [%s, %s].",
-                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.TIMESTAMP_MAX_VALUE));
+      checkNotNull(watermark, "watermark must not be null.");
+
+      // Making sure that the watermark is within bounds.

Review comment:
       `MIN` is the same but `GlobalWindow.MAX = BoundedWindow.MAX - 1 day` as can be seen here: https://github.com/apache/beam/blob/25b4ebc65953af34a0cfe83ed80feee0bc6df4ad/model/pipeline/src/main/proto/beam_runner_api.proto#L38




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r424814936



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
##########
@@ -37,18 +37,23 @@
     private Instant lastReportedWatermark;
 
     public Manual(Instant watermark) {
+      validateWatermark(watermark);
       this.watermark = checkNotNull(watermark, "watermark must not be null.");
-      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
-          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+    }
+
+    private void validateWatermark(Instant watermark) {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r421824248



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
##########
@@ -37,14 +37,16 @@
     private Instant lastReportedWatermark;
 
     public Manual(Instant watermark) {
-      this.watermark = checkNotNull(watermark, "watermark must not be null.");
-      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
-          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Provided watermark %s must be within bounds [%s, %s].",
-                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.TIMESTAMP_MAX_VALUE));
+      checkNotNull(watermark, "watermark must not be null.");
+
+      // Making sure that the watermark is within bounds.

Review comment:
       The bounds are wrong, they should be `BoundedWindow.TIMESTAMP_MIN_VALUE` and `BoundedWindow.TIMESTAMP_MAX_VALUE`. This could have been the issue with the timestamp being out of range. If that is the case then I would rather keep the bounds as they were and make setWatermark more restrictive. We should also update the javadoc for setWatermark/currentWatermark (on the interfaces) saying that the value must always be within these bounds.
   
   I think it makes more sense keep the restriction here and also make the restriction within setWatermark as well so the user gets an error when setting it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #11607:
URL: https://github.com/apache/beam/pull/11607#issuecomment-624959542


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r421830922



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
##########
@@ -37,14 +37,16 @@
     private Instant lastReportedWatermark;
 
     public Manual(Instant watermark) {
-      this.watermark = checkNotNull(watermark, "watermark must not be null.");
-      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
-          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Provided watermark %s must be within bounds [%s, %s].",
-                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.TIMESTAMP_MAX_VALUE));
+      checkNotNull(watermark, "watermark must not be null.");
+
+      // Making sure that the watermark is within bounds.

Review comment:
       Right, but we are using the constant from the super class here (TIMESTAMP_MAX_VALUE) not END_OF_GLOBAL_WINDOW.
   https://github.com/apache/beam/blob/25b4ebc65953af34a0cfe83ed80feee0bc6df4ad/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java#L39
   
   Also, the error we saw was following which points to min value being off, not max value.
   java.lang.IllegalArgumentException: Provided watermark -290308-12-21T19:59:05.224Z must be within bounds [-290308-12-21T19:59:05.225Z, 294247-01-10T04:00:54.775Z].
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r424830228



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -537,6 +538,12 @@ public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentEleme
     @NewWatermarkEstimator
     public WatermarkEstimators.Manual newWatermarkEstimator(
         @WatermarkEstimatorState Instant watermarkEstimatorState) {
+      // Making sure that the watermark is within bounds.
+      if (watermarkEstimatorState.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+        watermarkEstimatorState = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      } else if (watermarkEstimatorState.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+        watermarkEstimatorState = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      }

Review comment:
       I was thinking that this logic would be where we call setWatermark above on line 510
   
   Note that the initial watermark estimate state is current element timestamp which is always between MIN and MAX timestamp values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on pull request #11607: [BEAM-9430] Makes sure the watermarks returned by estimators are within bounds

Posted by GitBox <gi...@apache.org>.
ihji commented on pull request #11607:
URL: https://github.com/apache/beam/pull/11607#issuecomment-623860331


   Shouldn't we also disable `IllegalArgumentException`s in the constructors?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r421826896



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
##########
@@ -37,14 +37,16 @@
     private Instant lastReportedWatermark;
 
     public Manual(Instant watermark) {
-      this.watermark = checkNotNull(watermark, "watermark must not be null.");
-      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
-          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Provided watermark %s must be within bounds [%s, %s].",
-                watermark, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.TIMESTAMP_MAX_VALUE));
+      checkNotNull(watermark, "watermark must not be null.");
+
+      // Making sure that the watermark is within bounds.

Review comment:
       What's the difference between those values. Seems like GlobalWindow get's these constants from the super class BoundedWindow ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #11607:
URL: https://github.com/apache/beam/pull/11607#issuecomment-624959475


   Replaced the exception with a bound adjustment. PTAL.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ihji commented on a change in pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
ihji commented on a change in pull request #11607:
URL: https://github.com/apache/beam/pull/11607#discussion_r424809435



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java
##########
@@ -37,18 +37,23 @@
     private Instant lastReportedWatermark;
 
     public Manual(Instant watermark) {
+      validateWatermark(watermark);
       this.watermark = checkNotNull(watermark, "watermark must not be null.");
-      if (watermark.isBefore(GlobalWindow.TIMESTAMP_MIN_VALUE)
-          || watermark.isAfter(GlobalWindow.TIMESTAMP_MAX_VALUE)) {
+    }
+
+    private void validateWatermark(Instant watermark) {

Review comment:
       How about moving to parent scope and making static so we can share this method in all three estimators?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #11607: [BEAM-9430] Fixes the bounds of initial watermark set to estimators instead of raising an error

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #11607:
URL: https://github.com/apache/beam/pull/11607


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org