You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "tzulitai (via GitHub)" <gi...@apache.org> on 2023/04/14 17:04:20 UTC

[GitHub] [flink] tzulitai commented on a diff in pull request #22291: [FLINK-31632] Fix maxAllowedWatermark arithmetic overflow when the source is idle

tzulitai commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1167082068


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.java:
##########
@@ -160,13 +160,18 @@ public void testWatermarkAlignmentWithIdleness() throws Exception {
             expectedOutput.add(record1);
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, record1);
+            // mock WatermarkAlignmentEvent from SourceCoordinator
+            operator.handleOperatorEvent(new WatermarkAlignmentEvent(record1 + 100));
             assertOutput(actualOutput, expectedOutput);
             assertTrue(operator.isAvailable());
 
             // source becomes idle, it should report Long.MAX_VALUE as the watermark
             assertThat(operator.emitNext(actualOutput), is(DataInputStatus.NOTHING_AVAILABLE));
             context.getTimeService().advance(1);
             assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
+            // receive Long.MAX_VALUE as WatermarkAlignmentEvent
+            // because reported Long.MAX_VALUE watermark + maxAllowedWatermarkDrift will overflow

Review Comment:
   Probably don't need this comment; i.e. its too much of an implementation detail.
   
   Or maybe we just make note of the general contract between coordinator <--> subtasks: If all source subtasks of the watermark group are idle, then the coordinator will report `Long.MAX_VALUE`.
   
   Whether or not there was arithmetic overflow isn't really a concern here, so I would like to avoid excessive comments.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,18 @@ void announceCombinedWatermark() {
                                     aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =
+                    Math.addExact(
+                            globalCombinedWatermark.getTimestamp(),
+                            watermarkAlignmentParams.getMaxAllowedWatermarkDrift());
+        } catch (ArithmeticException e) {

Review Comment:
   Could `ArithmenticException` be thrown due to any other reason outside of overflowing? If yes, it could be a bit dangerous to treat it like so.
   
   Would it be sufficient to handle it like this?:
   ```
   long maxAllowedWatermark = (globalCombinedWatermark.getTimestamp() != Watermark.MAX_WATERMARK.getTimestamp())
       ? globalCombinedWatermark.getTimestamp() + watermarkAlignmentParams.getMaxAllowedWatermarkDrift()
       : Watermark.MAX_WATERMARK.getTimestamp()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org