You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "sunjincheng (JIRA)" <ji...@apache.org> on 2017/07/04 13:34:00 UTC
[jira] [Updated] (FLINK-7101) Fix Non-windowed group-aggregate
error when using `minIdleStateRetentionTime` config and retract agg
[ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sunjincheng updated FLINK-7101:
-------------------------------
Description:
When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config and retract AGG, Will emit "NULL" agg value which we do not expect.
For example: ({{IntSumWithRetractAggFunction}})
1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true)
2. Cleanup state
3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, getValue= null
So, we must change the logic of {{GroupAggProcessFunction}} as follows:
{code}
if (inputCnt != 0) {
...
} else {
...
}
{code}
TO
{code}
if (inputCnt > 0) {
...
} else {
if( null != prevRow.row){
...
}
}
{code}
In this case, the result will bigger than expected, but i think it's make sense, because user want cleanup state.(they should know the impact)
What do you think? [~fhueske] [~hequn8128]
was:
When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config and retract AGG, Will emit "NULL" agg value which we do not expect.
For example: ({{IntSumWithRetractAggFunction}})
1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true)
2. Cleanup state
3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, getValue= null
So, we must change the logic of {{GroupAggProcessFunction}} as follows:
{code}
if (inputCnt != 0) {
...
} else {
...
}
{code}
TO
{code}
if (inputCnt > 0) {
...
} else {
if( null != prevRow.row){
...
}
}
{code}
What do you think? [~fhueske] [~hequn8128]
> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-7101
> URL: https://issues.apache.org/jira/browse/FLINK-7101
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.0, 1.3.1
> Reporter: sunjincheng
> Assignee: sunjincheng
> Fix For: 1.4.0
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config and retract AGG, Will emit "NULL" agg value which we do not expect.
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true)
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false) // acc.f1 = -1, getValue= null
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
> ...
> } else {
> ...
> }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
> ...
> } else {
> if( null != prevRow.row){
> ...
> }
> }
> {code}
> In this case, the result will bigger than expected, but i think it's make sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)