You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2019/01/23 09:22:00 UTC

[jira] [Updated] (FLINK-11408) ContinuousProcessingTimeTrigger: NPE on clear() and state is lost on merge

     [ https://issues.apache.org/jira/browse/FLINK-11408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Robert Metzger updated FLINK-11408:
-----------------------------------
    Component/s: DataStream API

> ContinuousProcessingTimeTrigger: NPE on clear() and state is lost on merge
> --------------------------------------------------------------------------
>
>                 Key: FLINK-11408
>                 URL: https://issues.apache.org/jira/browse/FLINK-11408
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>         Environment: Put both bugs in [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs]
> This is running Flink 1.7.1 locally.
>            Reporter: Cristian
>            Priority: Major
>
> I was testing session windows using processing time and found a couple of problems with the 
> ContinuousProcessingTimeTrigger. The first one is an NPE in the clear method:
> [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug1.java]
> The second one, which is most likely related and the root cause of the first one, is that the way the state is merged for windows that are merged somehow makes it so that the trigger gets confused and it stops triggering:
>  
> [https://github.com/casidiablo/flink-continuous-processing-trigger-bugs/blob/master/src/main/java/flink/bug/Bug2.java]
>  
> I managed to solve both of these by using a modified version of 
> ContinuousProcessingTimeTrigger which does NOT call `ctx.mergePartitionedState(stateDesc);` in the onMerge method.
> This is what I understand it happens at the trigger level:
>  * The first element in the stream sets an initial fire time (logic is in
> ContinuousProcessingTimeTrigger#onElement()), if there is no trigger set.
>  * From then on, ContinuousProcessingTimeTrigger#onProcessingTime() is responsible for scheduling the next trigger. 
>  * When the windows are merged (in the case of session windows), somehow the clear and merge methods are called using the wrong window namespace (I think this is the root cause of the bug, but I'm not too familiar with that code).
>  * Because the state is not cleared properly in the new window namespace, the previously scheduled trigger gets executed against the window that was cleared.
>  * Moreover, the new window has the state of the previous window, which means that:
>  ## onElement will NOT schedule a fire trigger
>  ## onProcessingTime will never be called at all



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)