You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/28 13:22:22 UTC

[GitHub] [flink] pnowojski commented on a diff in pull request #20337: [FLINK-28504][streaming] Update watermarks across multiple oneinputstreams.

pnowojski commented on code in PR #20337:
URL: https://github.com/apache/flink/pull/20337#discussion_r932200661


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java:
##########
@@ -235,7 +252,12 @@ public void emitRecord(StreamRecord<IN> record) throws Exception {
 
         @Override
         public void emitWatermark(Watermark watermark) throws Exception {
-            watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
+            outputList.stream()
+                    .filter(output -> output.jobVertexID.equals(jobVertexID))
+                    .forEach(
+                            output ->
+                                    output.watermarkGauge.setCurrentWatermark(
+                                            watermark.getTimestamp()));

Review Comment:
   This looks wrong:
   
   1. Having static field accessed like that ring warning bells.
   2. Why emitting watermark in one subtask should bump watermark gauge from other subtasks? This sounds very incorrect thing to do. `WatermarkGauge` is not supposed to present max watermark across all subtasks, but the watermark of that given subtask. If something needs to aggregate and get "max" value, this should happen on a higher level, like REST API, or WebUI.
   3. Are you sure that job vertex ids are unique across multiple jobs running on the same cluster?
   4. It looks like you are creating a memory leak, never removing anything from the `outputList`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org