You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Felipe Gutierrez <fe...@gmail.com> on 2020/12/10 17:48:42 UTC

Help on the Split Distinct Aggregation from Table API

Hi,

I am trying to understand and simulate the "Split Distinct
Aggregation" [1] from Table API. I am executing the query:

SELECT driverId, COUNT(DISTINCT dayOfTheYear) FROM TaxiRide GROUP BY driverId

on the TaxiRide data from Flink exercises. As it is mentioned in the
link [1], the optimization of "Split Distinct Aggregation" reveals
good performance when there is sparse data on the column of the
distinct, which in my case is the "driverId". By sparse I understand
that the line that the query is processing, but the value on
"driverId" is null or 0. Am I correct?

So I created a second data source file in which only 10% of the rows
have a valid "driverId". The others are filled with the value 0. I
have 8 parallel data sources generating data at 25K r/s (total -> 200K
rec/sec). This is the data rate that I was getting backpressure when I
was executing count/sum/avg with Table API and then I used the
mini-batch and 2-phases to decrease the backpressure.

I was expecting that the query without any optimization
(mini-batch/2-phases) get high backpressure. Then as I change to
mini-batch, then to 2-phases I could see some optimization but still
with backpressure. Then when I change to split optimization I get low
backpressure.

Is there something wrong with my query or my data?
Thanks,
Felipe

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

Re: Help on the Split Distinct Aggregation from Table API

Posted by Felipe Gutierrez <fe...@gmail.com>.
I just realized that i have to use the dayOfTheYear on the gropuBy. I will
test again.


On Thu, 10 Dec 2020, 18:48 Felipe Gutierrez, <fe...@gmail.com>
wrote:

> Hi,
>
> I am trying to understand and simulate the "Split Distinct
> Aggregation" [1] from Table API. I am executing the query:
>
> SELECT driverId, COUNT(DISTINCT dayOfTheYear) FROM TaxiRide GROUP BY
> driverId
>
> on the TaxiRide data from Flink exercises. As it is mentioned in the
> link [1], the optimization of "Split Distinct Aggregation" reveals
> good performance when there is sparse data on the column of the
> distinct, which in my case is the "driverId". By sparse I understand
> that the line that the query is processing, but the value on
> "driverId" is null or 0. Am I correct?
>
> So I created a second data source file in which only 10% of the rows
> have a valid "driverId". The others are filled with the value 0. I
> have 8 parallel data sources generating data at 25K r/s (total -> 200K
> rec/sec). This is the data rate that I was getting backpressure when I
> was executing count/sum/avg with Table API and then I used the
> mini-batch and 2-phases to decrease the backpressure.
>
> I was expecting that the query without any optimization
> (mini-batch/2-phases) get high backpressure. Then as I change to
> mini-batch, then to 2-phases I could see some optimization but still
> with backpressure. Then when I change to split optimization I get low
> backpressure.
>
> Is there something wrong with my query or my data?
> Thanks,
> Felipe
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>