You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "haishui126 (via GitHub)" <gi...@apache.org> on 2023/03/29 03:20:11 UTC

[GitHub] [flink] haishui126 opened a new pull request, #22291: [FLINK-31632] maxAllowedWatermark overflow

haishui126 opened a new pull request, #22291:
URL: https://github.com/apache/flink/pull/22291

   ## What is the purpose of the change
   
   This PR aims to fix maxAllowedWatermark overflow when source is idle, which leads to source can't resume to active.
   
   
   ## Brief change log
   
   check maxAllowedWatermark is overflow. if so, set Long.MAX_VALUE
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no) no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) no
     - The serializers: (yes / no / don't know) no
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know) no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) no
     - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no) no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tzulitai closed pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai closed pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle
URL: https://github.com/apache/flink/pull/22291


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] haishui126 commented on a diff in pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

Posted by "haishui126 (via GitHub)" <gi...@apache.org>.
haishui126 commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1159239249


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##########
@@ -167,6 +167,9 @@ public void testWatermarkAlignmentWithIdleness() throws Exception {
             assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE));
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+            // receive Long.MAX_VALUE as WatermarkAlignmentEvent

Review Comment:
   Yes! I have read the code just now, I find the `currentMaxDesiredWatermark` is maxWatermark by default, that's why the old test passed. I'm confused whether it's right to write like this now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] haishui126 commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "haishui126 (via GitHub)" <gi...@apache.org>.
haishui126 commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1159403268


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##########
@@ -167,6 +167,9 @@ public void testWatermarkAlignmentWithIdleness() throws Exception {
             assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE));
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+            // receive Long.MAX_VALUE as WatermarkAlignmentEvent

Review Comment:
   I think so, and I have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tzulitai commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1167082068


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##########
@@ -160,13 +160,18 @@ public void testWatermarkAlignmentWithIdleness() throws Exception {
             expectedOutput.add(record1);
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, record1);
+            // mock WatermarkAlignmentEvent from SourceCoordinator
+            operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 100));
             assertOutput(actualOutput, expectedOutput);
             assertTrue(operator.isAvailable());
 
             // source becomes idle, it should report Long.MAX_VALUE as the watermark
             assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE));
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+            // receive Long.MAX_VALUE as WatermarkAlignmentEvent
+            // because reported Long.MAX_VALUE watermark + maxAllowedWatermarkDrift will overflow

Review Comment:
   Probably don't need this comment; i.e. its too much of an implementation detail.
   
   Or maybe we just make note of the general contract between coordinator <--> subtasks: If all source subtasks of the watermark group are idle, then the coordinator will report `Long.MAX_VALUE`.
   
   Whether or not there was arithmetic overflow isn't really a concern here, so I would like to avoid excessive comments.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,18 @@ void announceCombinedWatermark() {
                                     aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =
+                    Math.addExact(
+                            globalCombinedWatermark.getTimestamp(),
+                            watermarkAlignmentParams.getMaxAllowedWatermarkDrift());
+        } catch (ArithmeticException e) {

Review Comment:
   Could `ArithmenticException` be thrown due to any other reason outside of overflowing? If yes, it could be a bit dangerous to treat it like so.
   
   Would it be sufficient to handle it like this?:
   ```
   long maxAllowedWatermark = (globalCombinedWatermark.getTimestamp() != Watermark.MAX_WATERMARK.getTimestamp())
       ? globalCombinedWatermark.getTimestamp() + watermarkAlignmentParams.getMaxAllowedWatermarkDrift()
       : Watermark.MAX_WATERMARK.getTimestamp()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] haishui126 commented on pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

Posted by "haishui126 (via GitHub)" <gi...@apache.org>.
haishui126 commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1498458381

   > @haishui126 can we add a more meaningful commit message such as "[FLINK-XYZ] Fix maxAllowedWatermark arithmetic overflow when the source is idle" or something similar?
   
   Absolutely! "[FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle" sounds great! Do we edit the commit message on merging this pr or I re-push?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

Posted by "mas-chen (via GitHub)" <gi...@apache.org>.
mas-chen commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1159221918


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##########
@@ -167,6 +167,9 @@ public void testWatermarkAlignmentWithIdleness() throws Exception {
             assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE));
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+            // receive Long.MAX_VALUE as WatermarkAlignmentEvent

Review Comment:
   I think you want to check the `WatermarkAlignmentEvent`? and then assert on whether the maxWatermark is correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "pltbkd (via GitHub)" <gi...@apache.org>.
pltbkd commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1159523718


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,16 @@ void announceCombinedWatermark() {
                                     aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =

Review Comment:
   nit: maybe we can check with `Long.MAX_VALUE-globalCombinedWatermark.getTimestamp() >= watermarkAlignmentParams.getMaxAllowedWatermarkDrift()`, instead of using the try-catch? 
   Or maybe it's better to add a comment explaining why an ArithmeticException can be thrown here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "mas-chen (via GitHub)" <gi...@apache.org>.
mas-chen commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1159300038


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##########
@@ -167,6 +167,9 @@ public void testWatermarkAlignmentWithIdleness() throws Exception {
             assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE));
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+            // receive Long.MAX_VALUE as WatermarkAlignmentEvent

Review Comment:
   Oh, I see. Does it make sense to call operator.handleOperatorEvent(new WatermarkAlignmentEvent(...)) after every `assertLatestReportedWatermarkEvent` (line 162)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1487893751

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "27545880e14511404102f8c5390a03d152b8e03e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "27545880e14511404102f8c5390a03d152b8e03e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 27545880e14511404102f8c5390a03d152b8e03e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] haishui126 commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "haishui126 (via GitHub)" <gi...@apache.org>.
haishui126 commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1167684957


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##########
@@ -160,13 +160,18 @@ public void testWatermarkAlignmentWithIdleness() throws Exception {
             expectedOutput.add(record1);
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, record1);
+            // mock WatermarkAlignmentEvent from SourceCoordinator
+            operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 100));
             assertOutput(actualOutput, expectedOutput);
             assertTrue(operator.isAvailable());
 
             // source becomes idle, it should report Long.MAX_VALUE as the watermark
             assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE));
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+            // receive Long.MAX_VALUE as WatermarkAlignmentEvent
+            // because reported Long.MAX_VALUE watermark + maxAllowedWatermarkDrift will overflow

Review Comment:
   I agree with you. I‘ll edit the comment later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] haishui126 commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "haishui126 (via GitHub)" <gi...@apache.org>.
haishui126 commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1160405755


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,16 @@ void announceCombinedWatermark() {
                                     aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =

Review Comment:
   I add a comment to explain the ArithmeticException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] haishui126 commented on pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

Posted by "haishui126 (via GitHub)" <gi...@apache.org>.
haishui126 commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1491451371

   > The change makes sense. Can you add a unit test to verify the regression is fixed?
   > 
   > I also think there needs to be another change to the `shouldWaitForAlignment()` to not wait on max watermark
   
   After fixing the overflow, `currentMaxDesiredWatermark` = `lastEmittedWatermark` = Long.MAX_VALUE, the  `shouldWaitForAlignment()` will return false, and the idle source will keep READING. It looks like it won't need to be changed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] haishui126 commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "haishui126 (via GitHub)" <gi...@apache.org>.
haishui126 commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1167685236


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,18 @@ void announceCombinedWatermark() {
                                     aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =
+                    Math.addExact(
+                            globalCombinedWatermark.getTimestamp(),
+                            watermarkAlignmentParams.getMaxAllowedWatermarkDrift());
+        } catch (ArithmeticException e) {

Review Comment:
   This code solves the problem when a Source is idle.   I'm worried if there will be other cases that cause `globalCombinedWatermark.getTimestamp() + watermarkAlignmentParams.getMaxAllowedWatermarkDrift()` overflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tzulitai commented on pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1516646827

   Merging ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "pltbkd (via GitHub)" <gi...@apache.org>.
pltbkd commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1160621805


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,16 @@ void announceCombinedWatermark() {
                                     aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =

Review Comment:
   Thanks for the update! LGTM now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

Posted by "mas-chen (via GitHub)" <gi...@apache.org>.
mas-chen commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1159178247


##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java:
##########
@@ -83,6 +83,22 @@ void testWatermarkAlignmentWithIdleness() throws Exception {
             reportWatermarkEvent(sourceCoordinator1, subtask0, 42);
             assertLatestWatermarkAlignmentEvent(subtask0, 1042);
             assertLatestWatermarkAlignmentEvent(subtask1, 1042);
+
+            // all subtask becomes idle

Review Comment:
   Looks great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "mas-chen (via GitHub)" <gi...@apache.org>.
mas-chen commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1498516945

   > > arithmetic overflow when the source is idle" or something similar?
   > 
   > Absolutely! "[[FLINK-31632](https://issues.apache.org/jira/browse/FLINK-31632)] Fix maxAllowedWatermark arithmetic overflow when the source is idle" sounds great! Do we edit the commit message on merging this pr or I re-push?
   
   Let's try to do that before merge. You can squash the commits into 1 and amend the commit message. It will make it easier for the committer to help merge =)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] aristofun commented on pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

Posted by "aristofun (via GitHub)" <gi...@apache.org>.
aristofun commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1496586609

   Hi there. Thanks for your amazing work and fixing this. Any ETA when can it get into 1.16 branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] haishui126 commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "haishui126 (via GitHub)" <gi...@apache.org>.
haishui126 commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1160388130


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,16 @@ void announceCombinedWatermark() {
                                     aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =

Review Comment:
   Thanks for the suggestion, but the `Long.MAX_VALUE-globalCombinedWatermark.getTimestamp()` will overflow when `globalCombinedWatermark.getTimestamp()` is negated. If It's necessary to replace the try-catch, we can use: 
   ```java
           long maxAllowedWatermark =
                   globalCombinedWatermark.getTimestamp()
                           + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
           // check maxAllowedWatermark arithmetic overflow when the source is idle
           if (((globalCombinedWatermark.getTimestamp() ^ maxAllowedWatermark)
                           & (watermarkAlignmentParams.getMaxAllowedWatermarkDrift()
                                   ^ maxAllowedWatermark))
                   < 0) {
               maxAllowedWatermark = Watermark.MAX_WATERMARK.getTimestamp();
           }
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tzulitai commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "tzulitai (via GitHub)" <gi...@apache.org>.
tzulitai commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1167689028


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,18 @@ void announceCombinedWatermark() {
                                     aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =
+                    Math.addExact(
+                            globalCombinedWatermark.getTimestamp(),
+                            watermarkAlignmentParams.getMaxAllowedWatermarkDrift());
+        } catch (ArithmeticException e) {

Review Comment:
   Fair enough, I'm fine with leaving it as is then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pltbkd commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "pltbkd (via GitHub)" <gi...@apache.org>.
pltbkd commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1160398514


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,16 @@ void announceCombinedWatermark() {
                                     aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =

Review Comment:
   You are right, and I suppose `globalCombinedWatermark.getTimestamp() > 0 && Long.MAX_VALUE - globalCombinedWatermark.getTimestamp() < watermarkAlignmentParams.getMaxAllowedWatermarkDrift()` would be fine?
   I think it better to be clear why and how it is checked. So in fact I would prefer simply adding comments in the try-catch to the bit operation if the simple expression is not enough. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

Posted by "MartijnVisser (via GitHub)" <gi...@apache.org>.
MartijnVisser commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1508164176

   @mas-chen Are you OK with it? If so, @tzulitai do you want to have a final look before merging it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] haishui126 commented on pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

Posted by "haishui126 (via GitHub)" <gi...@apache.org>.
haishui126 commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1491418101

   I have added a test base on [FLINK-25982](https://issues.apache.org/jira/browse/FLINK-25982).
   Is there anything else I need to do?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #22291: [FLINK-31632] maxAllowedWatermark overflow

Posted by "mas-chen (via GitHub)" <gi...@apache.org>.
mas-chen commented on PR #22291:
URL: https://github.com/apache/flink/pull/22291#issuecomment-1498406007

   @haishui126 can we add a more meaningful commit message such as "[FLINK-XYZ] Fix maxAllowedWatermark arithmetic overflow when the source is idle" or something similar?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org