You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Puneet Duggal <pu...@gmail.com> on 2022/10/17 20:05:36 UTC

Flink CEP Incremental Checkpoint Issue

Hi,

I am working on a use case which uses Flink CEP for pattern detection.

Flink Version - 1.12.1
Deployment Mode - Session Mode (Highly Available)
State Backend - RocksDB
Checkpoint Interval - 2 mins
Checkpoint Mode - Exactly Once

CEP pattern looks something like - A not_followed_by B within (40mins)
After Match Skip Strategy - Skip Past Last Event

In order to test out incremental checkpointing and its size, I deployed a job on a cluster (let's say cluster A, hence job name J(aa)) and that same job on cluster B 1 week later (Job Name J(ab)). Basically at any given point in time, both jobs( J(aa) and J(ab)) process exactly the same records. After 1 week of deployment of J(ab), I found out that in spite of working on the same records and window time of 40mins (after which unmatched patterns should expire), the incremental checkpoint size of J(aa) is around 40-50MB whereas that of J(ab) is 25-30MB. My assumption of the incremental checkpoint is that it only contains delta state change after the last checkpoint which is same for both jobs. Attached screenshots for J(ab) and J(aa) respectively.

J(ab)



J(aa)



Checkpoint Configuration




One more doubt on the same lines is that these jobs consume on an average 6 events per second with one event of the size around 2KB. Assuming a checkpoint interval of 2 mins and each event getting stored in CEP state, total delta size of the state should be 2*60*6*1.32 = 316KB which is nowhere near to size shown in the incremental checkpoint for both the jobs. Even including meta info for these records, not sure what am i missing which is causing incremental checkpoints to be so huge.


Regards,
Puneet


Re: Flink CEP Incremental Checkpoint Issue

Posted by Puneet Duggal <pu...@gmail.com>.
Hi Yun Tang,

Thank you for the response and yes went through some articles which explained rocksdb incremental checkpointing mechanism and makes sense w.r.t. metrics that i am seeing.

Regards,
Puneet

> On 22-Oct-2022, at 1:26 PM, Yun Tang <my...@live.com> wrote:
> 
> compaction


Re: Flink CEP Incremental Checkpoint Issue

Posted by Yun Tang <my...@live.com>.
Hi Puneet,

The incremental checkpoint size of RocksDB state-backend is not exactly the delta state change, it is the size of newly uploaded SST files (which are not uploaded before). The newly uploaded SST files are generated by compaction or data flush.
In other words, I don't think we should care about the checkpoint size too much. Instead, we should care more about the output results.

Best
Yun Tang
________________________________
From: Martijn Visser <ma...@apache.org>
Sent: Wednesday, October 19, 2022 22:03
To: Puneet Duggal <pu...@gmail.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Flink CEP Incremental Checkpoint Issue

Hi,

Given that Flink 1.12 is no longer supported by the community, can you validate this with the latest Flink version? (Currently 1.15).
Next to that, the contents of your checkpoints is not only the results of your CEP, but given that you're using Exactly Once also there's internal information needed for providing those exactly once guarantees.

Best regards,

Martijn

On Mon, Oct 17, 2022 at 10:09 PM Puneet Duggal <pu...@gmail.com>> wrote:
Apologies for the mistake of calculation

120*6*2KB = 1440KB = 1.4MB

> On 18-Oct-2022, at 1:35 AM, Puneet Duggal <pu...@gmail.com>> wrote:
>
> Hi,
>
> I am working on a use case which uses Flink CEP for pattern detection.
>
> Flink Version - 1.12.1
> Deployment Mode - Session Mode (Highly Available)
> State Backend - RocksDB
> Checkpoint Interval - 2 mins
> Checkpoint Mode - Exactly Once
>
> CEP pattern looks something like - A not_followed_by B within (40mins)
> After Match Skip Strategy - Skip Past Last Event
>
> In order to test out incremental checkpointing and its size, I deployed a job on a cluster (let's say cluster A, hence job name J(aa)) and that same job on cluster B 1 week later (Job Name J(ab)). Basically at any given point in time, both jobs( J(aa) and J(ab)) process exactly the same records. After 1 week of deployment of J(ab), I found out that in spite of working on the same records and window time of 40mins (after which unmatched patterns should expire), the incremental checkpoint size of J(aa) is around 40-50MB whereas that of J(ab) is 25-30MB. My assumption of the incremental checkpoint is that it only contains delta state change after the last checkpoint which is same for both jobs. Attached screenshots for J(ab) and J(aa) respectively.
>
> J(ab)
>
> <Screenshot 2022-10-18 at 1.25.18 AM.png>
>
> J(aa)
>
> <Screenshot 2022-10-18 at 1.26.25 AM.png>
>
> Checkpoint Configuration
>
> <Screenshot 2022-10-18 at 1.29.10 AM.png>
>
>
> One more doubt on the same lines is that these jobs consume on an average 6 events per second with one event of the size around 2KB. Assuming a checkpoint interval of 2 mins and each event getting stored in CEP state, total delta size of the state should be 2*60*6*1.32 = 316KB which is nowhere near to size shown in the incremental checkpoint for both the jobs. Even including meta info for these records, not sure what am i missing which is causing incremental checkpoints to be so huge.
>
>
> Regards,
> Puneet
>


Re: Flink CEP Incremental Checkpoint Issue

Posted by Martijn Visser <ma...@apache.org>.
Hi,

Given that Flink 1.12 is no longer supported by the community, can you
validate this with the latest Flink version? (Currently 1.15).
Next to that, the contents of your checkpoints is not only the results of
your CEP, but given that you're using Exactly Once also there's internal
information needed for providing those exactly once guarantees.

Best regards,

Martijn

On Mon, Oct 17, 2022 at 10:09 PM Puneet Duggal <pu...@gmail.com>
wrote:

> Apologies for the mistake of calculation
>
> 120*6*2KB = 1440KB = 1.4MB
>
> > On 18-Oct-2022, at 1:35 AM, Puneet Duggal <pu...@gmail.com>
> wrote:
> >
> > Hi,
> >
> > I am working on a use case which uses Flink CEP for pattern detection.
> >
> > Flink Version - 1.12.1
> > Deployment Mode - Session Mode (Highly Available)
> > State Backend - RocksDB
> > Checkpoint Interval - 2 mins
> > Checkpoint Mode - Exactly Once
> >
> > CEP pattern looks something like - A not_followed_by B within (40mins)
> > After Match Skip Strategy - Skip Past Last Event
> >
> > In order to test out incremental checkpointing and its size, I deployed
> a job on a cluster (let's say cluster A, hence job name J(aa)) and that
> same job on cluster B 1 week later (Job Name J(ab)). Basically at any given
> point in time, both jobs( J(aa) and J(ab)) process exactly the same
> records. After 1 week of deployment of J(ab), I found out that in spite of
> working on the same records and window time of 40mins (after which
> unmatched patterns should expire), the incremental checkpoint size of J(aa)
> is around 40-50MB whereas that of J(ab) is 25-30MB. My assumption of the
> incremental checkpoint is that it only contains delta state change after
> the last checkpoint which is same for both jobs. Attached screenshots for
> J(ab) and J(aa) respectively.
> >
> > J(ab)
> >
> > <Screenshot 2022-10-18 at 1.25.18 AM.png>
> >
> > J(aa)
> >
> > <Screenshot 2022-10-18 at 1.26.25 AM.png>
> >
> > Checkpoint Configuration
> >
> > <Screenshot 2022-10-18 at 1.29.10 AM.png>
> >
> >
> > One more doubt on the same lines is that these jobs consume on an
> average 6 events per second with one event of the size around 2KB. Assuming
> a checkpoint interval of 2 mins and each event getting stored in CEP state,
> total delta size of the state should be 2*60*6*1.32 = 316KB which is
> nowhere near to size shown in the incremental checkpoint for both the jobs.
> Even including meta info for these records, not sure what am i missing
> which is causing incremental checkpoints to be so huge.
> >
> >
> > Regards,
> > Puneet
> >
>
>

Re: Flink CEP Incremental Checkpoint Issue

Posted by Puneet Duggal <pu...@gmail.com>.
Apologies for the mistake of calculation

120*6*2KB = 1440KB = 1.4MB

> On 18-Oct-2022, at 1:35 AM, Puneet Duggal <pu...@gmail.com> wrote:
> 
> Hi,
> 
> I am working on a use case which uses Flink CEP for pattern detection.
> 
> Flink Version - 1.12.1
> Deployment Mode - Session Mode (Highly Available)
> State Backend - RocksDB
> Checkpoint Interval - 2 mins
> Checkpoint Mode - Exactly Once
> 
> CEP pattern looks something like - A not_followed_by B within (40mins)
> After Match Skip Strategy - Skip Past Last Event
> 
> In order to test out incremental checkpointing and its size, I deployed a job on a cluster (let's say cluster A, hence job name J(aa)) and that same job on cluster B 1 week later (Job Name J(ab)). Basically at any given point in time, both jobs( J(aa) and J(ab)) process exactly the same records. After 1 week of deployment of J(ab), I found out that in spite of working on the same records and window time of 40mins (after which unmatched patterns should expire), the incremental checkpoint size of J(aa) is around 40-50MB whereas that of J(ab) is 25-30MB. My assumption of the incremental checkpoint is that it only contains delta state change after the last checkpoint which is same for both jobs. Attached screenshots for J(ab) and J(aa) respectively.
> 
> J(ab)
> 
> <Screenshot 2022-10-18 at 1.25.18 AM.png>
> 
> J(aa)
> 
> <Screenshot 2022-10-18 at 1.26.25 AM.png>
> 
> Checkpoint Configuration
> 
> <Screenshot 2022-10-18 at 1.29.10 AM.png>
> 
> 
> One more doubt on the same lines is that these jobs consume on an average 6 events per second with one event of the size around 2KB. Assuming a checkpoint interval of 2 mins and each event getting stored in CEP state, total delta size of the state should be 2*60*6*1.32 = 316KB which is nowhere near to size shown in the incremental checkpoint for both the jobs. Even including meta info for these records, not sure what am i missing which is causing incremental checkpoints to be so huge.
> 
> 
> Regards,
> Puneet
>