You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Abhishek Singla <ab...@gmail.com> on 2023/03/26 15:58:08 UTC

Flink CEP Resource Utilisation Optimisation

Hi Team,

*Flink Version:* 1.15.0
*Java Version:* 1.8
*Standalone Cluster*
*Task Manager:* AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42
Gb, 8 slots per TM)
*CEP Scenario:* Kafka Event A followed by Kafka Event B within 10 mins
*Throughput:* 20k events per second for Event A, 0 for Kafka Event B
*State Backend:* FsStateBackend
*Unaligned Checkpoints:* Enabled
*asynchronousSnapshots:* true

While testing this (Kafka Event A followed by Kafka Event B within 10 mins)
scenario on load environment, it took 20 nodes of TM to achieve this
throughput otherwise either CPU utilization would reach its peak or
backpressure would be observed because output buffers are full. The
checkpoint size is only 6.75 GB, the state stored within the CEP operator
would be much lesser as we do unaligned checkpointing.

I am looking for some input on if it takes this many resources to
archive this throughput, and if not what probably could be the issue here.

There was one more issue that I found If the throughput of Event A goes to
zero, then also the checkpoint size stays around 2 GB even after hours. Is
this expected?

Regards,
Abhishek Singla

Re: Flink CEP Resource Utilisation Optimisation

Posted by Abhishek Singla <ab...@gmail.com>.
Hi Geng,

I tried the same experiment with version 1.16.0 and was able to achieve the
same throughput with 3 nodes (24 parallelism) with CPU usage 60-70%

Regarding the checkpoint size issue: Since there were no input events, the
watermark was not advancing hence the events in CEP state were not getting
timed out.

Thanks again for helping out.

Regards,
Abhishek Singla

On Sun, Mar 26, 2023 at 10:33 PM Abhishek Singla <
abhisheksingla709@gmail.com> wrote:

> Thanks, yes there were a lot of keys in the test input. In fact, every
> event has a unique key which is not repeated in subsequent events.
>
> On Sun, Mar 26, 2023 at 10:26 PM Geng Biao <bi...@gmail.com> wrote:
>
>> I see your point. Are there lots of different keys in your test input? If
>> that is the case, CEP operator in 1.15.0 will not clean some intermediate
>> states(partial matches will be cleaned due to timeout but some
>> computation states are leaked). It is fixed in flink1.16(FLINK-31017) by
>> Juntao Hu.
>> Best,
>> Biao
>>
>> 获取 Outlook for iOS <https://aka.ms/o0ukef>
>> ------------------------------
>> *发件人:* Abhishek Singla <ab...@gmail.com>
>> *发送时间:* Monday, March 27, 2023 12:38:59 AM
>> *收件人:* Geng Biao <bi...@gmail.com>
>> *抄送:* user@flink.apache.org <us...@flink.apache.org>
>> *主题:* Re: Flink CEP Resource Utilisation Optimisation
>>
>> Thanks, Geng for the quick and actionable response.
>>
>> I will definitely try this with Flink version >= 1.16.0 and get back
>> with the observations.
>>
>> Regarding the checkpoint size issue, my concern is if there is no more
>> state, shouldn't the checkpoint size be way less than 2 GB? I mean I was
>> expecting it to be only a few MBs. Is there something I am missing here?
>>
>> Regards,
>> Abhishek Singla
>>
>> On Sun, Mar 26, 2023 at 9:56 PM Geng Biao <bi...@gmail.com> wrote:
>>
>> Hi Abhishek,
>>
>>
>>
>> Thanks for sharing the experiment! As for the performance question, I
>> believe you could give a try on Flink CEP with version >= 1.16.0, which
>> includes the optimization introduced in FLINK-23890
>> <https://issues.apache.org/jira/browse/FLINK-23890>. This optimization
>> will reduce lots of timer registration which can increase the throughput
>> significantly. In our own experiment, given same papalism settings, the
>> same job in 1.16.0 will require much less CPU usage than that in 1.15.x.
>> (~100% -> ~30%). In fact, due to the implementation, the optimization
>> should make CEP 10x better.  If you must use Flink1.15.0 for some reason,
>> you may cherry-pick the relevant change and recompile the CEP library by
>> yourself. The change does not depend on some framework changes so it may
>> not cost much efforts.
>>
>> As for the checkpoint size issue, CEP Operator will store immediate
>> matching result in the state. So if there are no new events, then there are
>> no new partial matched and CEP Operator will not use more state.
>>
>>
>>
>> Best,
>> Biao Geng
>>
>>
>>
>> *From: *Abhishek Singla <ab...@gmail.com>
>> *Date: *Sunday, March 26, 2023 at 11:58 PM
>> *To: *user@flink.apache.org <us...@flink.apache.org>
>> *Subject: *Flink CEP Resource Utilisation Optimisation
>>
>> Hi Team,
>>
>> *Flink Version:* 1.15.0
>> *Java Version:* 1.8
>> *Standalone Cluster*
>> *Task Manager:* AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42
>> Gb, 8 slots per TM)
>> *CEP Scenario:* Kafka Event A followed by Kafka Event B within 10 mins
>> *Throughput:* 20k events per second for Event A, 0 for Kafka Event B
>> *State Backend:* FsStateBackend
>> *Unaligned Checkpoints:* Enabled
>> *asynchronousSnapshots:* true
>>
>>
>>
>> While testing this (Kafka Event A followed by Kafka Event B within 10
>> mins) scenario on load environment, it took 20 nodes of TM to achieve this
>> throughput otherwise either CPU utilization would reach its peak or
>> backpressure would be observed because output buffers are full. The
>> checkpoint size is only 6.75 GB, the state stored within the CEP
>> operator would be much lesser as we do unaligned checkpointing.
>>
>>
>> I am looking for some input on if it takes this many resources to
>> archive this throughput, and if not what probably could be the issue here.
>>
>>
>>
>> There was one more issue that I found If the throughput of Event A goes
>> to zero, then also the checkpoint size stays around 2 GB even after hours.
>> Is this expected?
>>
>> Regards,
>> Abhishek Singla
>>
>>

Re: Flink CEP Resource Utilisation Optimisation

Posted by Abhishek Singla <ab...@gmail.com>.
Thanks, yes there were a lot of keys in the test input. In fact, every
event has a unique key which is not repeated in subsequent events.

On Sun, Mar 26, 2023 at 10:26 PM Geng Biao <bi...@gmail.com> wrote:

> I see your point. Are there lots of different keys in your test input? If
> that is the case, CEP operator in 1.15.0 will not clean some intermediate
> states(partial matches will be cleaned due to timeout but some computation states
> are leaked). It is fixed in flink1.16(FLINK-31017) by Juntao Hu.
> Best,
> Biao
>
> 获取 Outlook for iOS <https://aka.ms/o0ukef>
> ------------------------------
> *发件人:* Abhishek Singla <ab...@gmail.com>
> *发送时间:* Monday, March 27, 2023 12:38:59 AM
> *收件人:* Geng Biao <bi...@gmail.com>
> *抄送:* user@flink.apache.org <us...@flink.apache.org>
> *主题:* Re: Flink CEP Resource Utilisation Optimisation
>
> Thanks, Geng for the quick and actionable response.
>
> I will definitely try this with Flink version >= 1.16.0 and get back with
> the observations.
>
> Regarding the checkpoint size issue, my concern is if there is no more
> state, shouldn't the checkpoint size be way less than 2 GB? I mean I was
> expecting it to be only a few MBs. Is there something I am missing here?
>
> Regards,
> Abhishek Singla
>
> On Sun, Mar 26, 2023 at 9:56 PM Geng Biao <bi...@gmail.com> wrote:
>
> Hi Abhishek,
>
>
>
> Thanks for sharing the experiment! As for the performance question, I
> believe you could give a try on Flink CEP with version >= 1.16.0, which
> includes the optimization introduced in FLINK-23890
> <https://issues.apache.org/jira/browse/FLINK-23890>. This optimization
> will reduce lots of timer registration which can increase the throughput
> significantly. In our own experiment, given same papalism settings, the
> same job in 1.16.0 will require much less CPU usage than that in 1.15.x.
> (~100% -> ~30%). In fact, due to the implementation, the optimization
> should make CEP 10x better.  If you must use Flink1.15.0 for some reason,
> you may cherry-pick the relevant change and recompile the CEP library by
> yourself. The change does not depend on some framework changes so it may
> not cost much efforts.
>
> As for the checkpoint size issue, CEP Operator will store immediate
> matching result in the state. So if there are no new events, then there are
> no new partial matched and CEP Operator will not use more state.
>
>
>
> Best,
> Biao Geng
>
>
>
> *From: *Abhishek Singla <ab...@gmail.com>
> *Date: *Sunday, March 26, 2023 at 11:58 PM
> *To: *user@flink.apache.org <us...@flink.apache.org>
> *Subject: *Flink CEP Resource Utilisation Optimisation
>
> Hi Team,
>
> *Flink Version:* 1.15.0
> *Java Version:* 1.8
> *Standalone Cluster*
> *Task Manager:* AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42
> Gb, 8 slots per TM)
> *CEP Scenario:* Kafka Event A followed by Kafka Event B within 10 mins
> *Throughput:* 20k events per second for Event A, 0 for Kafka Event B
> *State Backend:* FsStateBackend
> *Unaligned Checkpoints:* Enabled
> *asynchronousSnapshots:* true
>
>
>
> While testing this (Kafka Event A followed by Kafka Event B within 10
> mins) scenario on load environment, it took 20 nodes of TM to achieve this
> throughput otherwise either CPU utilization would reach its peak or
> backpressure would be observed because output buffers are full. The
> checkpoint size is only 6.75 GB, the state stored within the CEP operator
> would be much lesser as we do unaligned checkpointing.
>
>
> I am looking for some input on if it takes this many resources to
> archive this throughput, and if not what probably could be the issue here.
>
>
>
> There was one more issue that I found If the throughput of Event A goes to
> zero, then also the checkpoint size stays around 2 GB even after hours. Is
> this expected?
>
> Regards,
> Abhishek Singla
>
>

Re: Flink CEP Resource Utilisation Optimisation

Posted by Geng Biao <bi...@gmail.com>.
I see your point. Are there lots of different keys in your test input? If that is the case, CEP operator in 1.15.0 will not clean some intermediate states(partial matches will be cleaned due to timeout but some computation states are leaked). It is fixed in flink1.16(FLINK-31017) by Juntao Hu.
Best,
Biao

获取 Outlook for iOS<https://aka.ms/o0ukef>
________________________________
发件人: Abhishek Singla <ab...@gmail.com>
发送时间: Monday, March 27, 2023 12:38:59 AM
收件人: Geng Biao <bi...@gmail.com>
抄送: user@flink.apache.org <us...@flink.apache.org>
主题: Re: Flink CEP Resource Utilisation Optimisation

Thanks, Geng for the quick and actionable response.

I will definitely try this with Flink version >= 1.16.0 and get back with the observations.

Regarding the checkpoint size issue, my concern is if there is no more state, shouldn't the checkpoint size be way less than 2 GB? I mean I was expecting it to be only a few MBs. Is there something I am missing here?

Regards,
Abhishek Singla

On Sun, Mar 26, 2023 at 9:56 PM Geng Biao <bi...@gmail.com>> wrote:

Hi Abhishek,



Thanks for sharing the experiment! As for the performance question, I believe you could give a try on Flink CEP with version >= 1.16.0, which includes the optimization introduced in FLINK-23890<https://issues.apache.org/jira/browse/FLINK-23890>. This optimization will reduce lots of timer registration which can increase the throughput significantly. In our own experiment, given same papalism settings, the same job in 1.16.0 will require much less CPU usage than that in 1.15.x. (~100% -> ~30%). In fact, due to the implementation, the optimization should make CEP 10x better.  If you must use Flink1.15.0 for some reason, you may cherry-pick the relevant change and recompile the CEP library by yourself. The change does not depend on some framework changes so it may not cost much efforts.

As for the checkpoint size issue, CEP Operator will store immediate matching result in the state. So if there are no new events, then there are no new partial matched and CEP Operator will not use more state.



Best,
Biao Geng



From: Abhishek Singla <ab...@gmail.com>>
Date: Sunday, March 26, 2023 at 11:58 PM
To: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: Flink CEP Resource Utilisation Optimisation

Hi Team,

Flink Version: 1.15.0
Java Version: 1.8
Standalone Cluster
Task Manager: AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42 Gb, 8 slots per TM)
CEP Scenario: Kafka Event A followed by Kafka Event B within 10 mins
Throughput: 20k events per second for Event A, 0 for Kafka Event B
State Backend: FsStateBackend
Unaligned Checkpoints: Enabled
asynchronousSnapshots: true



While testing this (Kafka Event A followed by Kafka Event B within 10 mins) scenario on load environment, it took 20 nodes of TM to achieve this throughput otherwise either CPU utilization would reach its peak or backpressure would be observed because output buffers are full. The checkpoint size is only 6.75 GB, the state stored within the CEP operator would be much lesser as we do unaligned checkpointing.

I am looking for some input on if it takes this many resources to archive this throughput, and if not what probably could be the issue here.



There was one more issue that I found If the throughput of Event A goes to zero, then also the checkpoint size stays around 2 GB even after hours. Is this expected?

Regards,
Abhishek Singla

Re: Flink CEP Resource Utilisation Optimisation

Posted by Abhishek Singla <ab...@gmail.com>.
Thanks, Geng for the quick and actionable response.

I will definitely try this with Flink version >= 1.16.0 and get back with
the observations.

Regarding the checkpoint size issue, my concern is if there is no more
state, shouldn't the checkpoint size be way less than 2 GB? I mean I was
expecting it to be only a few MBs. Is there something I am missing here?

Regards,
Abhishek Singla

On Sun, Mar 26, 2023 at 9:56 PM Geng Biao <bi...@gmail.com> wrote:

> Hi Abhishek,
>
>
>
> Thanks for sharing the experiment! As for the performance question, I
> believe you could give a try on Flink CEP with version >= 1.16.0, which
> includes the optimization introduced in FLINK-23890
> <https://issues.apache.org/jira/browse/FLINK-23890>. This optimization
> will reduce lots of timer registration which can increase the throughput
> significantly. In our own experiment, given same papalism settings, the
> same job in 1.16.0 will require much less CPU usage than that in 1.15.x.
> (~100% -> ~30%). In fact, due to the implementation, the optimization
> should make CEP 10x better.  If you must use Flink1.15.0 for some reason,
> you may cherry-pick the relevant change and recompile the CEP library by
> yourself. The change does not depend on some framework changes so it may
> not cost much efforts.
>
> As for the checkpoint size issue, CEP Operator will store immediate
> matching result in the state. So if there are no new events, then there are
> no new partial matched and CEP Operator will not use more state.
>
>
>
> Best,
> Biao Geng
>
>
>
> *From: *Abhishek Singla <ab...@gmail.com>
> *Date: *Sunday, March 26, 2023 at 11:58 PM
> *To: *user@flink.apache.org <us...@flink.apache.org>
> *Subject: *Flink CEP Resource Utilisation Optimisation
>
> Hi Team,
>
> *Flink Version:* 1.15.0
> *Java Version:* 1.8
> *Standalone Cluster*
> *Task Manager:* AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42
> Gb, 8 slots per TM)
> *CEP Scenario:* Kafka Event A followed by Kafka Event B within 10 mins
> *Throughput:* 20k events per second for Event A, 0 for Kafka Event B
> *State Backend:* FsStateBackend
> *Unaligned Checkpoints:* Enabled
> *asynchronousSnapshots:* true
>
>
>
> While testing this (Kafka Event A followed by Kafka Event B within 10
> mins) scenario on load environment, it took 20 nodes of TM to achieve this
> throughput otherwise either CPU utilization would reach its peak or
> backpressure would be observed because output buffers are full. The
> checkpoint size is only 6.75 GB, the state stored within the CEP operator
> would be much lesser as we do unaligned checkpointing.
>
>
> I am looking for some input on if it takes this many resources to
> archive this throughput, and if not what probably could be the issue here.
>
>
>
> There was one more issue that I found If the throughput of Event A goes to
> zero, then also the checkpoint size stays around 2 GB even after hours. Is
> this expected?
>
> Regards,
> Abhishek Singla
>

Re: Flink CEP Resource Utilisation Optimisation

Posted by Geng Biao <bi...@gmail.com>.
Hi Abhishek,

Thanks for sharing the experiment! As for the performance question, I believe you could give a try on Flink CEP with version >= 1.16.0, which includes the optimization introduced in FLINK-23890<https://issues.apache.org/jira/browse/FLINK-23890>. This optimization will reduce lots of timer registration which can increase the throughput significantly. In our own experiment, given same papalism settings, the same job in 1.16.0 will require much less CPU usage than that in 1.15.x. (~100% -> ~30%). In fact, due to the implementation, the optimization should make CEP 10x better.  If you must use Flink1.15.0 for some reason, you may cherry-pick the relevant change and recompile the CEP library by yourself. The change does not depend on some framework changes so it may not cost much efforts.
As for the checkpoint size issue, CEP Operator will store immediate matching result in the state. So if there are no new events, then there are no new partial matched and CEP Operator will not use more state.

Best,
Biao Geng

From: Abhishek Singla <ab...@gmail.com>
Date: Sunday, March 26, 2023 at 11:58 PM
To: user@flink.apache.org <us...@flink.apache.org>
Subject: Flink CEP Resource Utilisation Optimisation
Hi Team,

Flink Version: 1.15.0
Java Version: 1.8
Standalone Cluster
Task Manager: AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42 Gb, 8 slots per TM)
CEP Scenario: Kafka Event A followed by Kafka Event B within 10 mins
Throughput: 20k events per second for Event A, 0 for Kafka Event B
State Backend: FsStateBackend
Unaligned Checkpoints: Enabled
asynchronousSnapshots: true

While testing this (Kafka Event A followed by Kafka Event B within 10 mins) scenario on load environment, it took 20 nodes of TM to achieve this throughput otherwise either CPU utilization would reach its peak or backpressure would be observed because output buffers are full. The checkpoint size is only 6.75 GB, the state stored within the CEP operator would be much lesser as we do unaligned checkpointing.

I am looking for some input on if it takes this many resources to archive this throughput, and if not what probably could be the issue here.

There was one more issue that I found If the throughput of Event A goes to zero, then also the checkpoint size stays around 2 GB even after hours. Is this expected?

Regards,
Abhishek Singla