You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Andrey Zagrebin <az...@apache.org> on 2020/05/04 22:48:13 UTC

Re: Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation

Hi lsyldliu,

You can try to tune the StateTtlConfig. As the documentation suggests [1]
the TTL incremental cleanup can decrease the per record performance. This
is the price of the automatic cleanup.
If the only thing, which happens mostly in your operator, is working with
state then even checking one additional record to cleanup is two times more
actions to do.
Timer approach was discussed in TTL feature design. It needs an additional
implementation and keeps more state but performs only one cleanup action
exactly when needed so it is a performance/storage trade-off.

Anyways, 20x degradation looks indeed a lot.
As a first step, I would suggest to configure the incremental cleanup
explicitly in `StateTtlConfigUtil#createTtlConfig` with a less entries to
check, e.g. 1 because processFirstRow/processLastRow already access the
state twice and do cleanup:

.cleanupIncrementally(1, false)


Also not sure but depending on the input data, finishBundle can happen
mostly during the snapshotting which slows down taking the checkpoint.
Could this fail the checkpoint accumulating the backpressure and slowing
down the pipeline?

Not sure why to keep the deduplication data in a Java map and in Flink
state at the same time, why not to keep it only in Flink state and
deduplicate on each incoming record?

Best,
Andrey

[1] note 2 in
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup

On Wed, Apr 29, 2020 at 11:53 AM 刘大龙 <ld...@zju.edu.cn> wrote:

>
>
>
> > -----原始邮件-----
> > 发件人: "Jark Wu" <im...@gmail.com>
> > 发送时间: 2020-04-29 14:09:44 (星期三)
> > 收件人: dev <de...@flink.apache.org>, "Yu Li" <ca...@gmail.com>,
> myasuka@live.com
> > 抄送: azagrebin@apache.org
> > 主题: Re: The use of state ttl incremental cleanup strategy in sql
> deduplication resulting in significant performance degradation
> >
> > Hi lsyldliu,
> >
> > Thanks for investigating this.
> >
> > First of all, if you are using mini-batch deduplication, it doesn't
> support
> > state ttl in 1.9. That's why the tps looks the same with 1.11 disable
> state
> > ttl.
> > We just introduce state ttl for mini-batch deduplication recently.
> >
> > Regarding to the performance regression, it looks very surprise to me.
> The
> > performance is reduced by 19x when StateTtlConfig is enabled in 1.11.
> > I don't have much experience of the underlying of StateTtlConfig. So I
> loop
> > in @Yu Li <ca...@gmail.com> @YunTang in CC who may have more insights
> on
> > this.
> >
> > For more information, we use the following StateTtlConfig [1] in blink
> > planner:
> >
> > StateTtlConfig
> >   .newBuilder(Time.milliseconds(retentionTime))
> >   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >   .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> >   .build();
> >
> >
> > Best,
> > Jark
> >
> >
> > [1]:
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27
> >
> >
> >
> >
> >
> > On Wed, 29 Apr 2020 at 11:53, 刘大龙 <ld...@zju.edu.cn> wrote:
> >
> > > Hi, all!
> > >
> > > At flink master branch, we have supported state ttl  for sql mini-batch
> > > deduplication using incremental cleanup strategy on heap backend,
> refer to
> > > FLINK-16581. Because I want to test the performance of this feature,
> so I
> > > compile master branch code and deploy the jar to production
> > > environment,then run three types of tests, respectively:
> > >
> > >
> > >
> > >
> > > flink 1.9.0 release version enable state ttl
> > > flink 1.11-snapshot version disable state ttl
> > > flink 1.11-snapshot version enable state ttl
> > >
> > >
> > >
> > >
> > > The test query sql as follows:
> > >
> > > select order_date,
> > >     sum(price * amount - goods_all_fav_amt - virtual_money_amt +
> > > goods_carriage_amt) as saleP,
> > >     sum(amount) as saleN,
> > >     count(distinct parent_sn) as orderN,
> > >     count(distinct user_id) as cusN
> > >        from(
> > >             select order_date, user_id,
> > >             order_type, order_status, terminal, last_update_time,
> > > goods_all_fav_amt,
> > >             goods_carriage_amt, virtual_money_amt, price, amount,
> > > order_quality, quality_goods_cnt, acture_goods_amt
> > >             from (select *, row_number() over(partition by order_id,
> > > order_goods_id order by proctime desc) as rownum from
> dm_trd_order_goods)
> > >         where rownum=1
> > >         and (order_type in (1,2,3,4,5) or order_status = 70)
> > >         and terminal = 'shop' and price > 0)
> > >     group by order_date
> > >
> > >
> > > At runtime, this query will generate two operators which include
> > > Deduplication and GroupAgg. In the test, the configuration is same,
> > > parallelism is 20, set kafka consumer from the earliest, and disable
> > > mini-batch function, The test results as follows:
> > >
> > > flink 1.9.0 enable state ttl:this test lasted 44m, flink receive 1374w
> > > records, average tps at 5200/s, Flink UI picture link back pressure,
> > > checkpoint
> > > flink 1.11-snapshot version disable state ttl:this test lasted 28m,
> flink
> > > receive 883w records, average tps at 5200/s, Flink UI picture link back
> > > pressure, checkpoint
> > > flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m,
> > > flink only receive 168w records because of deduplication operator
> serious
> > > back pressure, average tps at 270/s, moreover, checkpoint always fail
> > > because of deduplication operator serious back pressure, Flink UI
> picture
> > > link back pressure, checkpoint
> > >
> > > Deduplication state clean up implement in flink 1.9.0 use timer, but
> > > 1.11-snapshot version use StateTtlConfig, this is the main difference.
> > > Comparing the three tests comprehensively, we can see that if disable
> state
> > > ttl in 1.11-snapshot the performance is the same with 1.9.0 enable
> state
> > > ttl. However, if enable state ttl in 1.11-snapshot, performance down is
> > > nearly 20 times, so I think incremental cleanup strategy cause this
> > > problem, what do you think about it? @azagrebin, @jark.
> > >
> > > Thanks.
> > >
> > > lsyldliu
> > >
> > > Zhejiang University, College of Control Science and engineer, CSC
>
>
> ------------------------------
> 刘大龙
>
> 浙江大学 控制系 智能系统与控制研究所 工控新楼217
> 地址:浙江省杭州市浙大路38号浙江大学玉泉校区
> Tel:18867547281
> Hi Jark,
> I use non-minibtach deduplication and group agg for the tests,
> non-minibatch deduplicaiton state ttl implementation has been refactored
> use StateTtlConfig replace timer in current 1.11 master branch that PR is
> my work, I also surprise to the 19x performance down.

Re: Re: The use of state ttl incremental cleanup strategy in sql deduplication resulting in significant performance degradation

Posted by Jark Wu <im...@gmail.com>.
Hi Andrey,

Thanks for the tuning ideas. I will explain the design of deduplication.

The mini-batch implementation of deduplication buffers a bundle of input
data in heap (Java Map),
when the bundle size hit the trigger size or trigger time, the buffered
data will be processed together.
So we only need to access the state once per key. This is designed for
rocksdb statebackend to reduce the
frequently accessing, (de)serialization. And yes, this may slow down the
checkpoint, but the suggested
mini-batch timeout is <= 10s. From our production experience, it doesn't
have much impact on checkpoint.

Best,
Jark

On Tue, 5 May 2020 at 06:48, Andrey Zagrebin <az...@apache.org> wrote:

> Hi lsyldliu,
>
> You can try to tune the StateTtlConfig. As the documentation suggests [1]
> the TTL incremental cleanup can decrease the per record performance. This
> is the price of the automatic cleanup.
> If the only thing, which happens mostly in your operator, is working with
> state then even checking one additional record to cleanup is two times more
> actions to do.
> Timer approach was discussed in TTL feature design. It needs an additional
> implementation and keeps more state but performs only one cleanup action
> exactly when needed so it is a performance/storage trade-off.
>
> Anyways, 20x degradation looks indeed a lot.
> As a first step, I would suggest to configure the incremental cleanup
> explicitly in `StateTtlConfigUtil#createTtlConfig` with a less entries to
> check, e.g. 1 because processFirstRow/processLastRow already access the
> state twice and do cleanup:
>
> .cleanupIncrementally(1, false)
>
>
> Also not sure but depending on the input data, finishBundle can happen
> mostly during the snapshotting which slows down taking the checkpoint.
> Could this fail the checkpoint accumulating the backpressure and slowing
> down the pipeline?
>
> Not sure why to keep the deduplication data in a Java map and in Flink
> state at the same time, why not to keep it only in Flink state and
> deduplicate on each incoming record?
>
> Best,
> Andrey
>
> [1] note 2 in
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#incremental-cleanup
>
> On Wed, Apr 29, 2020 at 11:53 AM 刘大龙 <ld...@zju.edu.cn> wrote:
>
> >
> >
> >
> > > -----原始邮件-----
> > > 发件人: "Jark Wu" <im...@gmail.com>
> > > 发送时间: 2020-04-29 14:09:44 (星期三)
> > > 收件人: dev <de...@flink.apache.org>, "Yu Li" <ca...@gmail.com>,
> > myasuka@live.com
> > > 抄送: azagrebin@apache.org
> > > 主题: Re: The use of state ttl incremental cleanup strategy in sql
> > deduplication resulting in significant performance degradation
> > >
> > > Hi lsyldliu,
> > >
> > > Thanks for investigating this.
> > >
> > > First of all, if you are using mini-batch deduplication, it doesn't
> > support
> > > state ttl in 1.9. That's why the tps looks the same with 1.11 disable
> > state
> > > ttl.
> > > We just introduce state ttl for mini-batch deduplication recently.
> > >
> > > Regarding to the performance regression, it looks very surprise to me.
> > The
> > > performance is reduced by 19x when StateTtlConfig is enabled in 1.11.
> > > I don't have much experience of the underlying of StateTtlConfig. So I
> > loop
> > > in @Yu Li <ca...@gmail.com> @YunTang in CC who may have more insights
> > on
> > > this.
> > >
> > > For more information, we use the following StateTtlConfig [1] in blink
> > > planner:
> > >
> > > StateTtlConfig
> > >   .newBuilder(Time.milliseconds(retentionTime))
> > >   .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> > >
>  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> > >   .build();
> > >
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > [1]:
> > >
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateTtlConfigUtil.java#L27
> > >
> > >
> > >
> > >
> > >
> > > On Wed, 29 Apr 2020 at 11:53, 刘大龙 <ld...@zju.edu.cn> wrote:
> > >
> > > > Hi, all!
> > > >
> > > > At flink master branch, we have supported state ttl  for sql
> mini-batch
> > > > deduplication using incremental cleanup strategy on heap backend,
> > refer to
> > > > FLINK-16581. Because I want to test the performance of this feature,
> > so I
> > > > compile master branch code and deploy the jar to production
> > > > environment,then run three types of tests, respectively:
> > > >
> > > >
> > > >
> > > >
> > > > flink 1.9.0 release version enable state ttl
> > > > flink 1.11-snapshot version disable state ttl
> > > > flink 1.11-snapshot version enable state ttl
> > > >
> > > >
> > > >
> > > >
> > > > The test query sql as follows:
> > > >
> > > > select order_date,
> > > >     sum(price * amount - goods_all_fav_amt - virtual_money_amt +
> > > > goods_carriage_amt) as saleP,
> > > >     sum(amount) as saleN,
> > > >     count(distinct parent_sn) as orderN,
> > > >     count(distinct user_id) as cusN
> > > >        from(
> > > >             select order_date, user_id,
> > > >             order_type, order_status, terminal, last_update_time,
> > > > goods_all_fav_amt,
> > > >             goods_carriage_amt, virtual_money_amt, price, amount,
> > > > order_quality, quality_goods_cnt, acture_goods_amt
> > > >             from (select *, row_number() over(partition by order_id,
> > > > order_goods_id order by proctime desc) as rownum from
> > dm_trd_order_goods)
> > > >         where rownum=1
> > > >         and (order_type in (1,2,3,4,5) or order_status = 70)
> > > >         and terminal = 'shop' and price > 0)
> > > >     group by order_date
> > > >
> > > >
> > > > At runtime, this query will generate two operators which include
> > > > Deduplication and GroupAgg. In the test, the configuration is same,
> > > > parallelism is 20, set kafka consumer from the earliest, and disable
> > > > mini-batch function, The test results as follows:
> > > >
> > > > flink 1.9.0 enable state ttl:this test lasted 44m, flink receive
> 1374w
> > > > records, average tps at 5200/s, Flink UI picture link back pressure,
> > > > checkpoint
> > > > flink 1.11-snapshot version disable state ttl:this test lasted 28m,
> > flink
> > > > receive 883w records, average tps at 5200/s, Flink UI picture link
> back
> > > > pressure, checkpoint
> > > > flink 1.11-snapshot version enable state ttl:this test lasted 1h 43m,
> > > > flink only receive 168w records because of deduplication operator
> > serious
> > > > back pressure, average tps at 270/s, moreover, checkpoint always fail
> > > > because of deduplication operator serious back pressure, Flink UI
> > picture
> > > > link back pressure, checkpoint
> > > >
> > > > Deduplication state clean up implement in flink 1.9.0 use timer, but
> > > > 1.11-snapshot version use StateTtlConfig, this is the main
> difference.
> > > > Comparing the three tests comprehensively, we can see that if disable
> > state
> > > > ttl in 1.11-snapshot the performance is the same with 1.9.0 enable
> > state
> > > > ttl. However, if enable state ttl in 1.11-snapshot, performance down
> is
> > > > nearly 20 times, so I think incremental cleanup strategy cause this
> > > > problem, what do you think about it? @azagrebin, @jark.
> > > >
> > > > Thanks.
> > > >
> > > > lsyldliu
> > > >
> > > > Zhejiang University, College of Control Science and engineer, CSC
> >
> >
> > ------------------------------
> > 刘大龙
> >
> > 浙江大学 控制系 智能系统与控制研究所 工控新楼217
> > 地址:浙江省杭州市浙大路38号浙江大学玉泉校区
> > Tel:18867547281
> > Hi Jark,
> > I use non-minibtach deduplication and group agg for the tests,
> > non-minibatch deduplicaiton state ttl implementation has been refactored
> > use StateTtlConfig replace timer in current 1.11 master branch that PR is
> > my work, I also surprise to the 19x performance down.
>