You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by sunfulin <su...@163.com> on 2020/01/08 07:39:17 UTC

Flink SQL Count Distinct performance optimization

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With one scenario I'm trying to count(distinct deviceID) over about 100GB data set in realtime, and aggregate results with sink to ElasticSearch index. I met a severe performance issue when running my flink job. Wanner get some help from community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism is set to be 10, which is equal to my kafka source partitions. After running the job, I can observe high backpressure from the flink dashboard. Any suggestions and kind of help is highly appreciated. 


running sql is like the following:
    

INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  from

(

    SELECT

     aggId,

     pageId,

     statkey,

     COUNT(DISTINCT deviceId) as cnt

     FROM

     (

         SELECT

         'ZL_005' as aggId,

         'ZL_UV_PER_MINUTE' as pageId,

         deviceId,

         ts2Date(recvTime) as statkey

         from

         kafka_zl_etrack_event_stream

     )

     GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re:Re: Flink SQL Count Distinct performance optimization

Posted by sunfulin <su...@163.com>.

hi,godfreyhe
As far as I can see, I rewrite the running sql from one count distinct level to 2 level agg, just as the table.optimizer.distinct-agg.split.enabled param worked. Correct me if I am telling the wrong way. But the rewrite sql does not work well for the performance throughout. 
For now I am not able to use blink planner on my apps because the current prod environment has not planned or ready to up to Flink 1.9+. 







At 2020-01-08 15:52:28, "贺小令" <go...@gmail.com> wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct aggregation. you can also try to enable table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With one scenario I'm trying to count(distinct deviceID) over about 100GB data set in realtime, and aggregate results with sink to ElasticSearch index. I met a severe performance issue when running my flink job. Wanner get some help from community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism is set to be 10, which is equal to my kafka source partitions. After running the job, I can observe high backpressure from the flink dashboard. Any suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  from

(

    SELECT

     aggId,

     pageId,

     statkey,

     COUNT(DISTINCT deviceId) as cnt

     FROM

     (

         SELECT

         'ZL_005' as aggId,

         'ZL_UV_PER_MINUTE' as pageId,

         deviceId,

         ts2Date(recvTime) as statkey

         from

         kafka_zl_etrack_event_stream

     )

     GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re:Re: Re: Flink SQL Count Distinct performance optimization

Posted by sunfulin <su...@163.com>.
hi,
Thanks for the reply. I am using default FsStateBackend rather than rocksdb with checkpoint off. So I really cannot see any state info from the dashboard. I will research more details and see if any alternative can be optimized. 











At 2020-01-08 19:07:08, "Benchao Li" <li...@gmail.com> wrote:
>hi sunfulin,
>
>As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
>bound your job.
>You can check WindowOperator's latency metric to see how long it tasks to
>process an element.
>Hope this helps.
>
>sunfulin <su...@163.com> 于2020年1月8日周三 下午4:04写道:
>
>> Ah, I had checked resource usage and GC from flink dashboard. Seem that
>> the reason is not cpu or memory issue. Task heap memory usage is less then
>> 30%. Could you kindly tell that how I can see more metrics to help target
>> the bottleneck?
>> Really appreciated that.
>>
>>
>>
>>
>>
>> At 2020-01-08 15:59:17, "Kurt Young" <yk...@gmail.com> wrote:
>>
>> Hi,
>>
>> Could you try to find out what's the bottleneck of your current job? This
>> would leads to
>> different optimizations. Such as whether it's CPU bounded, or you have too
>> big local
>> state thus stuck by too many slow IOs.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Jan 8, 2020 at 3:53 PM 贺小令 <go...@gmail.com> wrote:
>>
>>> hi sunfulin,
>>> you can try with blink planner (since 1.9 +), which optimizes distinct
>>> aggregation. you can also try to enable
>>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>>
>>> best,
>>> godfreyhe
>>>
>>> sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:
>>>
>>>> Hi, community,
>>>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>>>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>>>> data set in realtime, and aggregate results with sink to ElasticSearch
>>>> index. I met a severe performance issue when running my flink job. Wanner
>>>> get some help from community.
>>>>
>>>>
>>>> Flink version : 1.8.2
>>>> Running on yarn with 4 yarn slots per task manager. My flink task
>>>> parallelism is set to be 10, which is equal to my kafka source partitions.
>>>> After running the job, I can observe high backpressure from the flink
>>>> dashboard. Any suggestions and kind of help is highly appreciated.
>>>>
>>>>
>>>> running sql is like the following:
>>>>
>>>>
>>>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>>>
>>>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>>>> clkCnt  from
>>>>
>>>> (
>>>>
>>>>     SELECT
>>>>
>>>>      aggId,
>>>>
>>>>      pageId,
>>>>
>>>>      statkey,
>>>>
>>>>      COUNT(DISTINCT deviceId) as cnt
>>>>
>>>>      FROM
>>>>
>>>>      (
>>>>
>>>>          SELECT
>>>>
>>>>          'ZL_005' as aggId,
>>>>
>>>>          'ZL_UV_PER_MINUTE' as pageId,
>>>>
>>>>          deviceId,
>>>>
>>>>          ts2Date(recvTime) as statkey
>>>>
>>>>          from
>>>>
>>>>          kafka_zl_etrack_event_stream
>>>>
>>>>      )
>>>>
>>>>      GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>>>
>>>> ) as t1
>>>>
>>>> group by aggId, pageId, statkey
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Best
>>>
>>>
>>
>>
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re:Re: Re: Flink SQL Count Distinct performance optimization

Posted by sunfulin <su...@163.com>.
hi,
Thanks for the reply. I am using default FsStateBackend rather than rocksdb with checkpoint off. So I really cannot see any state info from the dashboard. I will research more details and see if any alternative can be optimized. 











At 2020-01-08 19:07:08, "Benchao Li" <li...@gmail.com> wrote:
>hi sunfulin,
>
>As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
>bound your job.
>You can check WindowOperator's latency metric to see how long it tasks to
>process an element.
>Hope this helps.
>
>sunfulin <su...@163.com> 于2020年1月8日周三 下午4:04写道:
>
>> Ah, I had checked resource usage and GC from flink dashboard. Seem that
>> the reason is not cpu or memory issue. Task heap memory usage is less then
>> 30%. Could you kindly tell that how I can see more metrics to help target
>> the bottleneck?
>> Really appreciated that.
>>
>>
>>
>>
>>
>> At 2020-01-08 15:59:17, "Kurt Young" <yk...@gmail.com> wrote:
>>
>> Hi,
>>
>> Could you try to find out what's the bottleneck of your current job? This
>> would leads to
>> different optimizations. Such as whether it's CPU bounded, or you have too
>> big local
>> state thus stuck by too many slow IOs.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Jan 8, 2020 at 3:53 PM 贺小令 <go...@gmail.com> wrote:
>>
>>> hi sunfulin,
>>> you can try with blink planner (since 1.9 +), which optimizes distinct
>>> aggregation. you can also try to enable
>>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>>
>>> best,
>>> godfreyhe
>>>
>>> sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:
>>>
>>>> Hi, community,
>>>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>>>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>>>> data set in realtime, and aggregate results with sink to ElasticSearch
>>>> index. I met a severe performance issue when running my flink job. Wanner
>>>> get some help from community.
>>>>
>>>>
>>>> Flink version : 1.8.2
>>>> Running on yarn with 4 yarn slots per task manager. My flink task
>>>> parallelism is set to be 10, which is equal to my kafka source partitions.
>>>> After running the job, I can observe high backpressure from the flink
>>>> dashboard. Any suggestions and kind of help is highly appreciated.
>>>>
>>>>
>>>> running sql is like the following:
>>>>
>>>>
>>>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>>>
>>>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>>>> clkCnt  from
>>>>
>>>> (
>>>>
>>>>     SELECT
>>>>
>>>>      aggId,
>>>>
>>>>      pageId,
>>>>
>>>>      statkey,
>>>>
>>>>      COUNT(DISTINCT deviceId) as cnt
>>>>
>>>>      FROM
>>>>
>>>>      (
>>>>
>>>>          SELECT
>>>>
>>>>          'ZL_005' as aggId,
>>>>
>>>>          'ZL_UV_PER_MINUTE' as pageId,
>>>>
>>>>          deviceId,
>>>>
>>>>          ts2Date(recvTime) as statkey
>>>>
>>>>          from
>>>>
>>>>          kafka_zl_etrack_event_stream
>>>>
>>>>      )
>>>>
>>>>      GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>>>
>>>> ) as t1
>>>>
>>>> group by aggId, pageId, statkey
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Best
>>>
>>>
>>
>>
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: Re: Flink SQL Count Distinct performance optimization

Posted by Benchao Li <li...@gmail.com>.
hi sunfulin,

As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
bound your job.
You can check WindowOperator's latency metric to see how long it tasks to
process an element.
Hope this helps.

sunfulin <su...@163.com> 于2020年1月8日周三 下午4:04写道:

> Ah, I had checked resource usage and GC from flink dashboard. Seem that
> the reason is not cpu or memory issue. Task heap memory usage is less then
> 30%. Could you kindly tell that how I can see more metrics to help target
> the bottleneck?
> Really appreciated that.
>
>
>
>
>
> At 2020-01-08 15:59:17, "Kurt Young" <yk...@gmail.com> wrote:
>
> Hi,
>
> Could you try to find out what's the bottleneck of your current job? This
> would leads to
> different optimizations. Such as whether it's CPU bounded, or you have too
> big local
> state thus stuck by too many slow IOs.
>
> Best,
> Kurt
>
>
> On Wed, Jan 8, 2020 at 3:53 PM 贺小令 <go...@gmail.com> wrote:
>
>> hi sunfulin,
>> you can try with blink planner (since 1.9 +), which optimizes distinct
>> aggregation. you can also try to enable
>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>
>> best,
>> godfreyhe
>>
>> sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:
>>
>>> Hi, community,
>>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>>> data set in realtime, and aggregate results with sink to ElasticSearch
>>> index. I met a severe performance issue when running my flink job. Wanner
>>> get some help from community.
>>>
>>>
>>> Flink version : 1.8.2
>>> Running on yarn with 4 yarn slots per task manager. My flink task
>>> parallelism is set to be 10, which is equal to my kafka source partitions.
>>> After running the job, I can observe high backpressure from the flink
>>> dashboard. Any suggestions and kind of help is highly appreciated.
>>>
>>>
>>> running sql is like the following:
>>>
>>>
>>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>>
>>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>>> clkCnt  from
>>>
>>> (
>>>
>>>     SELECT
>>>
>>>      aggId,
>>>
>>>      pageId,
>>>
>>>      statkey,
>>>
>>>      COUNT(DISTINCT deviceId) as cnt
>>>
>>>      FROM
>>>
>>>      (
>>>
>>>          SELECT
>>>
>>>          'ZL_005' as aggId,
>>>
>>>          'ZL_UV_PER_MINUTE' as pageId,
>>>
>>>          deviceId,
>>>
>>>          ts2Date(recvTime) as statkey
>>>
>>>          from
>>>
>>>          kafka_zl_etrack_event_stream
>>>
>>>      )
>>>
>>>      GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>>
>>> ) as t1
>>>
>>> group by aggId, pageId, statkey
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Best
>>
>>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re: Re: Flink SQL Count Distinct performance optimization

Posted by Benchao Li <li...@gmail.com>.
hi sunfulin,

As Kurt pointed out, if you use RocksDB state backend, maybe slow disk IO
bound your job.
You can check WindowOperator's latency metric to see how long it tasks to
process an element.
Hope this helps.

sunfulin <su...@163.com> 于2020年1月8日周三 下午4:04写道:

> Ah, I had checked resource usage and GC from flink dashboard. Seem that
> the reason is not cpu or memory issue. Task heap memory usage is less then
> 30%. Could you kindly tell that how I can see more metrics to help target
> the bottleneck?
> Really appreciated that.
>
>
>
>
>
> At 2020-01-08 15:59:17, "Kurt Young" <yk...@gmail.com> wrote:
>
> Hi,
>
> Could you try to find out what's the bottleneck of your current job? This
> would leads to
> different optimizations. Such as whether it's CPU bounded, or you have too
> big local
> state thus stuck by too many slow IOs.
>
> Best,
> Kurt
>
>
> On Wed, Jan 8, 2020 at 3:53 PM 贺小令 <go...@gmail.com> wrote:
>
>> hi sunfulin,
>> you can try with blink planner (since 1.9 +), which optimizes distinct
>> aggregation. you can also try to enable
>> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>>
>> best,
>> godfreyhe
>>
>> sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:
>>
>>> Hi, community,
>>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>>> data set in realtime, and aggregate results with sink to ElasticSearch
>>> index. I met a severe performance issue when running my flink job. Wanner
>>> get some help from community.
>>>
>>>
>>> Flink version : 1.8.2
>>> Running on yarn with 4 yarn slots per task manager. My flink task
>>> parallelism is set to be 10, which is equal to my kafka source partitions.
>>> After running the job, I can observe high backpressure from the flink
>>> dashboard. Any suggestions and kind of help is highly appreciated.
>>>
>>>
>>> running sql is like the following:
>>>
>>>
>>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>>
>>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>>> clkCnt  from
>>>
>>> (
>>>
>>>     SELECT
>>>
>>>      aggId,
>>>
>>>      pageId,
>>>
>>>      statkey,
>>>
>>>      COUNT(DISTINCT deviceId) as cnt
>>>
>>>      FROM
>>>
>>>      (
>>>
>>>          SELECT
>>>
>>>          'ZL_005' as aggId,
>>>
>>>          'ZL_UV_PER_MINUTE' as pageId,
>>>
>>>          deviceId,
>>>
>>>          ts2Date(recvTime) as statkey
>>>
>>>          from
>>>
>>>          kafka_zl_etrack_event_stream
>>>
>>>      )
>>>
>>>      GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>>
>>> ) as t1
>>>
>>> group by aggId, pageId, statkey
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Best
>>
>>
>
>
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Re:Re: Flink SQL Count Distinct performance optimization

Posted by sunfulin <su...@163.com>.
Ah, I had checked resource usage and GC from flink dashboard. Seem that the reason is not cpu or memory issue. Task heap memory usage is less then 30%. Could you kindly tell that how I can see more metrics to help target the bottleneck? 
Really appreciated that.








At 2020-01-08 15:59:17, "Kurt Young" <yk...@gmail.com> wrote:

Hi,


Could you try to find out what's the bottleneck of your current job? This would leads to 
different optimizations. Such as whether it's CPU bounded, or you have too big local 
state thus stuck by too many slow IOs. 


Best,

Kurt





On Wed, Jan 8, 2020 at 3:53 PM 贺小令 <go...@gmail.com> wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct aggregation. you can also try to enable table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With one scenario I'm trying to count(distinct deviceID) over about 100GB data set in realtime, and aggregate results with sink to ElasticSearch index. I met a severe performance issue when running my flink job. Wanner get some help from community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism is set to be 10, which is equal to my kafka source partitions. After running the job, I can observe high backpressure from the flink dashboard. Any suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  from

(

    SELECT

     aggId,

     pageId,

     statkey,

     COUNT(DISTINCT deviceId) as cnt

     FROM

     (

         SELECT

         'ZL_005' as aggId,

         'ZL_UV_PER_MINUTE' as pageId,

         deviceId,

         ts2Date(recvTime) as statkey

         from

         kafka_zl_etrack_event_stream

     )

     GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re:Re: Flink SQL Count Distinct performance optimization

Posted by sunfulin <su...@163.com>.
Ah, I had checked resource usage and GC from flink dashboard. Seem that the reason is not cpu or memory issue. Task heap memory usage is less then 30%. Could you kindly tell that how I can see more metrics to help target the bottleneck? 
Really appreciated that.








At 2020-01-08 15:59:17, "Kurt Young" <yk...@gmail.com> wrote:

Hi,


Could you try to find out what's the bottleneck of your current job? This would leads to 
different optimizations. Such as whether it's CPU bounded, or you have too big local 
state thus stuck by too many slow IOs. 


Best,

Kurt





On Wed, Jan 8, 2020 at 3:53 PM 贺小令 <go...@gmail.com> wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct aggregation. you can also try to enable table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With one scenario I'm trying to count(distinct deviceID) over about 100GB data set in realtime, and aggregate results with sink to ElasticSearch index. I met a severe performance issue when running my flink job. Wanner get some help from community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism is set to be 10, which is equal to my kafka source partitions. After running the job, I can observe high backpressure from the flink dashboard. Any suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  from

(

    SELECT

     aggId,

     pageId,

     statkey,

     COUNT(DISTINCT deviceId) as cnt

     FROM

     (

         SELECT

         'ZL_005' as aggId,

         'ZL_UV_PER_MINUTE' as pageId,

         deviceId,

         ts2Date(recvTime) as statkey

         from

         kafka_zl_etrack_event_stream

     )

     GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re: Flink SQL Count Distinct performance optimization

Posted by Kurt Young <yk...@gmail.com>.
Hi,

Could you try to find out what's the bottleneck of your current job? This
would leads to
different optimizations. Such as whether it's CPU bounded, or you have too
big local
state thus stuck by too many slow IOs.

Best,
Kurt


On Wed, Jan 8, 2020 at 3:53 PM 贺小令 <go...@gmail.com> wrote:

> hi sunfulin,
> you can try with blink planner (since 1.9 +), which optimizes distinct
> aggregation. you can also try to enable
> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>
> best,
> godfreyhe
>
> sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:
>
>> Hi, community,
>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>> data set in realtime, and aggregate results with sink to ElasticSearch
>> index. I met a severe performance issue when running my flink job. Wanner
>> get some help from community.
>>
>>
>> Flink version : 1.8.2
>> Running on yarn with 4 yarn slots per task manager. My flink task
>> parallelism is set to be 10, which is equal to my kafka source partitions.
>> After running the job, I can observe high backpressure from the flink
>> dashboard. Any suggestions and kind of help is highly appreciated.
>>
>>
>> running sql is like the following:
>>
>>
>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>
>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>> clkCnt  from
>>
>> (
>>
>>     SELECT
>>
>>      aggId,
>>
>>      pageId,
>>
>>      statkey,
>>
>>      COUNT(DISTINCT deviceId) as cnt
>>
>>      FROM
>>
>>      (
>>
>>          SELECT
>>
>>          'ZL_005' as aggId,
>>
>>          'ZL_UV_PER_MINUTE' as pageId,
>>
>>          deviceId,
>>
>>          ts2Date(recvTime) as statkey
>>
>>          from
>>
>>          kafka_zl_etrack_event_stream
>>
>>      )
>>
>>      GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>
>> ) as t1
>>
>> group by aggId, pageId, statkey
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Best
>
>

Re:Re: Flink SQL Count Distinct performance optimization

Posted by sunfulin <su...@163.com>.

hi,godfreyhe
As far as I can see, I rewrite the running sql from one count distinct level to 2 level agg, just as the table.optimizer.distinct-agg.split.enabled param worked. Correct me if I am telling the wrong way. But the rewrite sql does not work well for the performance throughout. 
For now I am not able to use blink planner on my apps because the current prod environment has not planned or ready to up to Flink 1.9+. 







At 2020-01-08 15:52:28, "贺小令" <go...@gmail.com> wrote:

hi sunfulin,  
you can try with blink planner (since 1.9 +), which optimizes distinct aggregation. you can also try to enable table.optimizer.distinct-agg.split.enabled if the data is skew.


best,
godfreyhe


sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:

Hi, community,
I'm using Apache Flink SQL to build some of my realtime streaming apps. With one scenario I'm trying to count(distinct deviceID) over about 100GB data set in realtime, and aggregate results with sink to ElasticSearch index. I met a severe performance issue when running my flink job. Wanner get some help from community.


Flink version : 1.8.2
Running on yarn with 4 yarn slots per task manager. My flink task parallelism is set to be 10, which is equal to my kafka source partitions. After running the job, I can observe high backpressure from the flink dashboard. Any suggestions and kind of help is highly appreciated.


running sql is like the following:


INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)

select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as clkCnt  from

(

    SELECT

     aggId,

     pageId,

     statkey,

     COUNT(DISTINCT deviceId) as cnt

     FROM

     (

         SELECT

         'ZL_005' as aggId,

         'ZL_UV_PER_MINUTE' as pageId,

         deviceId,

         ts2Date(recvTime) as statkey

         from

         kafka_zl_etrack_event_stream

     )

     GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)

) as t1

group by aggId, pageId, statkey
















Best

Re: Flink SQL Count Distinct performance optimization

Posted by Kurt Young <yk...@gmail.com>.
Hi,

Could you try to find out what's the bottleneck of your current job? This
would leads to
different optimizations. Such as whether it's CPU bounded, or you have too
big local
state thus stuck by too many slow IOs.

Best,
Kurt


On Wed, Jan 8, 2020 at 3:53 PM 贺小令 <go...@gmail.com> wrote:

> hi sunfulin,
> you can try with blink planner (since 1.9 +), which optimizes distinct
> aggregation. you can also try to enable
> *table.optimizer.distinct-agg.split.enabled* if the data is skew.
>
> best,
> godfreyhe
>
> sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:
>
>> Hi, community,
>> I'm using Apache Flink SQL to build some of my realtime streaming apps.
>> With one scenario I'm trying to count(distinct deviceID) over about 100GB
>> data set in realtime, and aggregate results with sink to ElasticSearch
>> index. I met a severe performance issue when running my flink job. Wanner
>> get some help from community.
>>
>>
>> Flink version : 1.8.2
>> Running on yarn with 4 yarn slots per task manager. My flink task
>> parallelism is set to be 10, which is equal to my kafka source partitions.
>> After running the job, I can observe high backpressure from the flink
>> dashboard. Any suggestions and kind of help is highly appreciated.
>>
>>
>> running sql is like the following:
>>
>>
>> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>>
>> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
>> clkCnt  from
>>
>> (
>>
>>     SELECT
>>
>>      aggId,
>>
>>      pageId,
>>
>>      statkey,
>>
>>      COUNT(DISTINCT deviceId) as cnt
>>
>>      FROM
>>
>>      (
>>
>>          SELECT
>>
>>          'ZL_005' as aggId,
>>
>>          'ZL_UV_PER_MINUTE' as pageId,
>>
>>          deviceId,
>>
>>          ts2Date(recvTime) as statkey
>>
>>          from
>>
>>          kafka_zl_etrack_event_stream
>>
>>      )
>>
>>      GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>>
>> ) as t1
>>
>> group by aggId, pageId, statkey
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Best
>
>

Re: Flink SQL Count Distinct performance optimization

Posted by 贺小令 <go...@gmail.com>.
hi sunfulin,
you can try with blink planner (since 1.9 +), which optimizes distinct
aggregation. you can also try to enable
*table.optimizer.distinct-agg.split.enabled* if the data is skew.

best,
godfreyhe

sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:

> Hi, community,
> I'm using Apache Flink SQL to build some of my realtime streaming apps.
> With one scenario I'm trying to count(distinct deviceID) over about 100GB
> data set in realtime, and aggregate results with sink to ElasticSearch
> index. I met a severe performance issue when running my flink job. Wanner
> get some help from community.
>
>
> Flink version : 1.8.2
> Running on yarn with 4 yarn slots per task manager. My flink task
> parallelism is set to be 10, which is equal to my kafka source partitions.
> After running the job, I can observe high backpressure from the flink
> dashboard. Any suggestions and kind of help is highly appreciated.
>
>
> running sql is like the following:
>
>
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>
> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
> clkCnt  from
>
> (
>
>     SELECT
>
>      aggId,
>
>      pageId,
>
>      statkey,
>
>      COUNT(DISTINCT deviceId) as cnt
>
>      FROM
>
>      (
>
>          SELECT
>
>          'ZL_005' as aggId,
>
>          'ZL_UV_PER_MINUTE' as pageId,
>
>          deviceId,
>
>          ts2Date(recvTime) as statkey
>
>          from
>
>          kafka_zl_etrack_event_stream
>
>      )
>
>      GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>
> ) as t1
>
> group by aggId, pageId, statkey
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best

Re: Flink SQL Count Distinct performance optimization

Posted by 贺小令 <go...@gmail.com>.
hi sunfulin,
you can try with blink planner (since 1.9 +), which optimizes distinct
aggregation. you can also try to enable
*table.optimizer.distinct-agg.split.enabled* if the data is skew.

best,
godfreyhe

sunfulin <su...@163.com> 于2020年1月8日周三 下午3:39写道:

> Hi, community,
> I'm using Apache Flink SQL to build some of my realtime streaming apps.
> With one scenario I'm trying to count(distinct deviceID) over about 100GB
> data set in realtime, and aggregate results with sink to ElasticSearch
> index. I met a severe performance issue when running my flink job. Wanner
> get some help from community.
>
>
> Flink version : 1.8.2
> Running on yarn with 4 yarn slots per task manager. My flink task
> parallelism is set to be 10, which is equal to my kafka source partitions.
> After running the job, I can observe high backpressure from the flink
> dashboard. Any suggestions and kind of help is highly appreciated.
>
>
> running sql is like the following:
>
>
> INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)
>
> select aggId, pageId, statkey as ts, sum(cnt) as expoCnt, count(cnt) as
> clkCnt  from
>
> (
>
>     SELECT
>
>      aggId,
>
>      pageId,
>
>      statkey,
>
>      COUNT(DISTINCT deviceId) as cnt
>
>      FROM
>
>      (
>
>          SELECT
>
>          'ZL_005' as aggId,
>
>          'ZL_UV_PER_MINUTE' as pageId,
>
>          deviceId,
>
>          ts2Date(recvTime) as statkey
>
>          from
>
>          kafka_zl_etrack_event_stream
>
>      )
>
>      GROUP BY aggId, pageId, statkey, MOD(hashCode(deviceId), 1024)
>
> ) as t1
>
> group by aggId, pageId, statkey
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Best