You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "YeAble (Jira)" <ji...@apache.org> on 2022/06/14 01:04:00 UTC

[jira] [Updated] (FLINK-28033) find and output new min watermark mybe wrong when in multichannel

     [ https://issues.apache.org/jira/browse/FLINK-28033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

YeAble updated FLINK-28033:
---------------------------
    Affects Version/s:     (was: 1.15.0)

> find and output new min watermark mybe wrong when in multichannel
> -----------------------------------------------------------------
>
>                 Key: FLINK-28033
>                 URL: https://issues.apache.org/jira/browse/FLINK-28033
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>            Reporter: YeAble
>            Priority: Major
>
> File: StatusWatermarkValue.java
> Method:  findAndOutputNewMinWatermarkAcrossAlignedChannels
> {code:java}
> //代码占位符
> long newMinWatermark = Long.MAX_VALUE;
> boolean hasAlignedChannels = false;
> // determine new overall watermark by considering only watermark-aligned channels across all
> // channels
> for (InputChannelStatus channelStatus : channelStatuses) {
>     if (channelStatus.isWatermarkAligned) {
>         hasAlignedChannels = true;
>         newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
>     }
> }
> // we acknowledge and output the new overall watermark if it really is aggregated
> // from some remaining aligned channel, and is also larger than the last output watermark
> if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) {
>     lastOutputWatermark = newMinWatermark;
>     output.emitWatermark(new Watermark(lastOutputWatermark));
> } {code}
>  channelStatus's initalized watermark is Long.MIN_VALUE. when one channelStatus's watermark is changed,but other channelStatus's is not changed, the newMinWatermark is always Long.MIN_VALUE and output not emitwatermark。 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)