You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by wang guanglei <gl...@outlook.com> on 2022/02/11 06:05:20 UTC

There Is a Delay While Over Aggregation Sending Results

Hey Flink Community,

I am using FlinkSQL Over Aggregation <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/over-agg/> to calculate the number of uuid per client ip during the past 1 hour.
The flink sql I am using is something like below:
SELECT
COUNT(DISTINCT consumer_consumerUuid) OVER w AS feature_value,
clientIp as              entity_id
FROM wide_table
WINDOW w AS (
PARTITION BY clientIp
ORDER BY ts
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
)
​From the documentation, we know that the OVER aggregates produce an aggregated value for every input row, which means (in my view) the calculation is triggered by every input event in wide_table not by watermark?
However, seeing from my logs, there is always about a 5-60 seconds' delay between the input row and the result calculated by window.

The data volume is small, there are only about 1k records/hour in table wide_table and less than 10 consumer for each clientIp.

Is it normal with this delay? Or there is something wrong with the way it is used ?

Thanks.

Re: There Is a Delay While Over Aggregation Sending Results

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Did you define watermark on ts? If yes the result will be produced only
after the watermark exceeds its row time, thus causing the delay. See [1]
for detail.

[1]
https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/RowTimeRowsBoundedPrecedingFunction.java#L179

wang guanglei <gl...@outlook.com> 于2022年2月11日周五 14:05写道:

> Hey Flink Community,
>
> I am using FlinkSQL Over Aggregation
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/over-agg/>to
> calculate the number of uuid per client ip during the past 1 hour.
> The flink sql I am using is something like below:
>
> SELECT
> COUNT(DISTINCT consumer_consumerUuid) OVER w AS feature_value,
> clientIp as              entity_id
> FROM wide_table
> WINDOW w AS (
> PARTITION BY clientIp
> ORDER BY ts
> RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
> )
>
> ​From the documentation, we know that *the *OVER *aggregates produce an
> aggregated value for every input row, *which means (in my view) the
> calculation is triggered by *every input event* in wide_table not by
> *watermark?*
> However, seeing from my logs, there is always about a 5-60 seconds' delay
> between the input row and the result calculated by window.
>
> The data volume is small, there are only about 1k records/hour in table
> wide_table and less than 10 consumer for each clientIp.
>
> Is it normal with this delay? Or there is something wrong with the way it
> is used ?
>
> Thanks.
>