You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/10/02 09:27:00 UTC

[jira] [Closed] (FLINK-7721) StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels

     [ https://issues.apache.org/jira/browse/FLINK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aljoscha Krettek closed FLINK-7721.
-----------------------------------
       Resolution: Fixed
    Fix Version/s:     (was: 1.2.2)

Fixed on release-1.3 in
b67bc4da568df08f5a27f5a84546b54c85b5bb7a

Fixed on master in 
1de47316405a255d3fa21f684ea1500ae59aa9e4

[~tzulitai] Flink 1.2.x doesn't have the valve, right? Or would the fix go into a different part?

> StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7721
>                 URL: https://issues.apache.org/jira/browse/FLINK-7721
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.1, 1.4.0, 1.3.2
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> Context:
> {code}
> long newMinWatermark = Long.MAX_VALUE;
> for (InputChannelStatus channelStatus : channelStatuses) {
>     if (channelStatus.isWatermarkAligned) {
>         newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
>     }
> }
> {code}
> In the calculation of the new min watermark in {{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}}, there is not verification that the calculated new min watermark {{newMinWatermark}} really is aggregated from some aligned channel.
> In the corner case where all input channels are currently not aligned but actually some are active, we would then incorrectly determine that the final aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that.
> The fix would simply be to only emit the aggregated watermark IFF it was really calculated from some aligned input channel (as well as the already existing constraint that it needs to be larger than the last emitted watermark). This change should also safely cover the case that a {{Long.MAX_VALUE}} was genuinely aggregated from one of the input channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)