You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2017/07/04 15:07:00 UTC

[jira] [Commented] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

    [ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073795#comment-16073795 ] 

Fabian Hueske commented on FLINK-7101:
--------------------------------------

That's a good find [~sunjincheng121] and a tricky question.

I think the best strategy is to completely ignore retraction records if {{inputCnt == 0}} or {{inputCnt == null}}. 
So I would actually exit the method before we modify the accumulator.

In the long run, this problem might not occur anymore. 
If all operators implement the state retention interval correctly, all keys without updates within some time should have been cleaned before they can sent a retraction to an operator with cleaned state. A previous operator would not be able to sent retraction after the cleanup interval because it would have to be cleaned before (not 100% sure about this, need to think about this a bit more). 
This would also mean that each operator has to sent out updates even if the result does not change (the {{prevRow.row.equals(newRow.row)}} check should be removed because it might cause state to be cleaned up too early).

> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7101
>                 URL: https://issues.apache.org/jira/browse/FLINK-7101
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.0, 1.3.1
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>             Fix For: 1.4.0
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>      ...
>     } else {
>      ...
>     }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>      ...
>     } else {
>     if( null != prevRow.row){
>      ...
>      }
>     }
> {code}
> In this case, the result will bigger than expected, but i think it's make sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)