You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "sunjincheng (JIRA)" <ji...@apache.org> on 2017/07/05 07:14:00 UTC

[jira] [Comment Edited] (FLINK-7101) Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg

    [ https://issues.apache.org/jira/browse/FLINK-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074330#comment-16074330 ] 

sunjincheng edited comment on FLINK-7101 at 7/5/17 7:13 AM:
------------------------------------------------------------

Hi [~fhueske] I think:
1. We need the retraction records if {{inputCnt == 0}};
2. For current tableAPI/SQL, we should ignore retraction records if {{inputCnt < 0}} (when cleanup state);
3. You are right we should change the condition {{ if (prevRow.row.equals(newRow.row)) }} to {{ if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled)}}

BTW, if we can set the parallel of operator(in the future), we also need change the current build-in {{SumWithRetractAggFunction}} getValue logic:
{code}
override def getValue(acc: SumWithRetractAccumulator[T]): T = {
    if (acc.f1 > 0) { *====> acc.f1 != 0*
      acc.f0
    } else {
      null.asInstanceOf[T]
    }
  }
{code}
The reason is:
!screenshot-1.png!

What do you think?
*Note: the Sum AGG not using in above example, the Sketch just show we need change the sum AGG logic.*


was (Author: sunjincheng121):
Hi [~fhueske] I think:
1. We need the retraction records if {{inputCnt == 0}};
2. For current tableAPI/SQL, we should ignore retraction records if {{inputCnt < 0}} (when cleanup state);
3. You are right we should change the condition {{ if (prevRow.row.equals(newRow.row)) }} to {{ if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled)}}

BTW, if we can set the parallel of operator(in the future), we also need change the current build-in {{SumWithRetractAggFunction}} getValue logic:
{code}
override def getValue(acc: SumWithRetractAccumulator[T]): T = {
    if (acc.f1 > 0) { *====> acc.f1 != 0*
      acc.f0
    } else {
      null.asInstanceOf[T]
    }
  }
{code}
The reason is:
!screenshot-1.png!

What do you think?

> Fix Non-windowed group-aggregate error when using `minIdleStateRetentionTime` config and retract agg
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7101
>                 URL: https://issues.apache.org/jira/browse/FLINK-7101
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.0, 1.3.1
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>             Fix For: 1.4.0
>
>         Attachments: screenshot-1.png
>
>
> When Non-windowed group-aggregate using {{minIdleStateRetentionTime}} config and retract AGG, Will emit "NULL" agg value which we do not expect. 
> For example: ({{IntSumWithRetractAggFunction}})
> 1. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), true) 
> 2. Cleanup state
> 3. Receive: CRow(Row.of(6L: JLong, 5: JInt, "aaa"), false)  // acc.f1 = -1, getValue= null 
> So, we must change the logic of {{GroupAggProcessFunction}} as follows:
> {code}
> if (inputCnt != 0) {
>      ...
>     } else {
>      ...
>     }
> {code}
> TO
> {code}
> if (inputCnt > 0) {
>      ...
>     } else {
>     if( null != prevRow.row){
>      ...
>      }
>     }
> {code}
> In this case, the result will bigger than expected, but i think it's make sense, because user want cleanup state.(they should know the impact)
> What do you think? [~fhueske] [~hequn8128]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)