You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "xuyang (Jira)" <ji...@apache.org> on 2024/01/05 03:35:00 UTC

[jira] [Commented] (FLINK-33936) The aggregation of mini-batches should output the result even if the result is the same as before when TTL is configured.

    [ https://issues.apache.org/jira/browse/FLINK-33936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803395#comment-17803395 ] 

xuyang commented on FLINK-33936:
--------------------------------

Thanks for your report. This bug seems to be that when optimizations about mini-batch agg were first introduced, some behaviors in the group agg function were not aligned. I think we need to fix it.

> The aggregation of mini-batches should output the result even if the result is the same as before when TTL is configured.
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33936
>                 URL: https://issues.apache.org/jira/browse/FLINK-33936
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.18.0
>            Reporter: Feng Jin
>            Priority: Major
>
> If mini-batch is enabled currently, and if the aggregated result is the same as the previous output, this current aggregation result will not be sent downstream.  This will cause downstream nodes to not receive updated data. If there is a TTL set for states at this time, the TTL of downstream will not be updated either.
> The specific logic is as follows.
> https://github.com/apache/flink/blob/a18c0cd3f0cdfd7e0acb53283f40cd2033a86472/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGroupAggFunction.java#L224
> {code:java}
>                     if (!equaliser.equals(prevAggValue, newAggValue)) {
>                         // new row is not same with prev row
>                         if (generateUpdateBefore) {
>                             // prepare UPDATE_BEFORE message for previous row
>                             resultRow
>                                     .replace(currentKey, prevAggValue)
>                                     .setRowKind(RowKind.UPDATE_BEFORE);
>                             out.collect(resultRow);
>                         }
>                         // prepare UPDATE_AFTER message for new row
>                         resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
>                         out.collect(resultRow);
>                     }
>                     // new row is same with prev row, no need to output
> {code}
> When mini-batch is not enabled, even if the aggregation result of this time is the same as last time, new results will still be sent if TTL is set.
> https://github.com/apache/flink/blob/e9353319ad625baa5b2c20fa709ab5b23f83c0f4/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L170
> {code:java}
>                 if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, newAggValue)) {
>                     // newRow is the same as before and state cleaning is not enabled.
>                     // We do not emit retraction and acc message.
>                     // If state cleaning is enabled, we have to emit messages to prevent too early
>                     // state eviction of downstream operators.
>                     return;
>                 } else {
>                     // retract previous result
>                     if (generateUpdateBefore) {
>                         // prepare UPDATE_BEFORE message for previous row
>                         resultRow
>                                 .replace(currentKey, prevAggValue)
>                                 .setRowKind(RowKind.UPDATE_BEFORE);
>                         out.collect(resultRow);
>                     }
>                     // prepare UPDATE_AFTER message for new row
>                     resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
>                 }
> {code}
> Therefore, based on the consideration of TTL scenarios, I believe that when mini-batch aggregation is enabled, new results should also output when the aggregated result is the same as the previous one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)