You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by St...@natwestmarkets.com on 2020/04/22 16:02:40 UTC

Apache beam job on Flink checkpoint size growing over time

One of our Apache beam job running through the FlinkRunner is experiencing an odd behaviour with checkpoint size. The state backend is File based. The job receives traffic once a day for a period of an hour and then is idle until it receives more data.

The checkpoint slowly increments in size as we process more data. However, the size of the checkpoint does not decrease significantly once data has stopped being consumed for that day.
We thought it could potentially be a bottle neck with the Database sink however the same behaviour is present if we remove the sink and simply dump the data.
The behaviour seems to resemble a stepped graph e.g.
*         checkpoint = 120KB (starting size checkpoint)
*         checkpoint = 409MB (starts receiving data)
*         checkpoint = 850MB (processing the backlog data)
*         checkpoint = 503MB (finished processing data)
*         checkpoint = 1.2GB (begins processing new data and backlog)
*         checkpoint = 700MB (finished processing data)
*         checkpoint = 700MB (new starting size for checkpoint)
*         ...

Has anyone see this behaviour before? is this a known issue with Flink checkpointing using Apache beam?
Thanks,
Steve


Stephen Hesketh | Client Analytics Technology
The information classification of this email is Confidential unless otherwise stated.



This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.

Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.

NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.

Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.

Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.

RE: Apache beam job on Flink checkpoint size growing over time

Posted by St...@natwestmarkets.com.
We have resolved the issue.

The problem was the global window in the fusion break. Despite discarding panes and not explicitly saving any state, the use of the global window ensured state was built up in the checkpoint.

Moving to a short sliding window based on processing time allows the state to be released and the checkpoint growth is stopped.

Regards,

Steve

Stephen Hesketh | Client Analytics Technology
S +44 (0)7968 039848
+ stephen.hesketh@natwestmarkets.co.uk 
250 Bishopsgate | London | EC2M 4AA
The information classification of this email is Confidential unless otherwise stated. 

-----Original Message-----
From: Hesketh, Stephen (Technology, NatWest Markets) 
Sent: 14 May 2020 18:18
To: user@beam.apache.org; mxm@apache.org
Cc: Dunkley, Bryan (NatWest Markets)
Subject: RE: Apache beam job on Flink checkpoint size growing over time

Hi all,

This issue is linked to the fusion break we are using to distribute the events from the simple single threaded, unbounded source with no data stored in the checkpoint.

I have recreated on a simple pipeline that:

 - generates dummy events (just holding an id) from an UnboundedSource (returning Collections.singletonList(this) in the split to ensure active source on single thread)
 - uses the fusion break attached
 - has a dummy output DoFn

The checkpoint size steadily increases when running on the FlinkRunner. The pipeline options are the simple command line args below.

Any suggestions would be appreciated! Does Flink not require / like fusion breaks?

Cheers,

Steve


--runner=flink
--flinkMaster=XXXXXXXXXXXXXXXX:NNNN
--streaming=true
--parallelism=1
--checkpointingInterval=20000
--checkpointTimeoutMillis=10000
--minPauseBetweenCheckpoints=10000
--failOnCheckpointingErrors=false
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true
--numberOfExecutionRetries=2
--executionRetryDelay=1000


Stephen Hesketh
The information classification of this email is Confidential unless otherwise stated. 

-----Original Message-----
From: Hesketh, Stephen (Technology, NatWest Markets) 
Sent: 01 May 2020 15:04
To: mxm@apache.org; user@beam.apache.org
Subject: RE: Apache beam job on Flink checkpoint size growing over time


*********************************************
"This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished"
*********************************************

Hi Max,

We could test  a patched BEAM Pipeline if it might help resolve the issue.

What would be involved in getting the patched version over? (We may need to get security approval to get it down loaded.)

Many thanks,

Steve


Stephen Hesketh
The information classification of this email is Confidential unless otherwise stated. 


-----Original Message-----
From: Maximilian Michels [mailto:mxm@apache.org] 
Sent: 30 April 2020 10:17
To: Hesketh, Stephen (Technology, NatWest Markets); user@beam.apache.org
Subject: Re: Apache beam job on Flink checkpoint size growing over time


*********************************************
"This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished"
*********************************************

Hey Steve,

The checkpoint buffer is cleared "lazily" when a new bundle is started.
Theoretically, if no more data was to arrive after a checkpoint, then
this buffer would not be cleared until more data arrived. If this
repeats every time new data comes in, then always some data would remain
buffered which one could observe in the checkpoint state.

We could fix this by making sure the buffer is flushed when a checkpoint
completes, instead of merely flushing it "lazily" when new data arrives.

Do you have the option to test a patched Beam version?

Best,
Max

On 29.04.20 14:34, Stephen.Hesketh@natwestmarkets.com wrote:
> Hi Max,
> 
> The ingestion is covering EOD processing from a Kafka source, so we get a lot of data from 5pm-8pm and outside of that time we get no data. The checkpoint is just storing the Kafka offset for restart.
> 
> Sounds like during the period of no data there could be an open buffer. I would have thought that would be cleared soon after data starts flowing again though and wouldn't lead to an increase in checkpoint size over a number of days.
> 
> Unless we are missing something in BEAM and aren't actually triggering a new start bundle at any point, which is why the buffer continues to grow and is never flushed?
> 
> I am going to try to recreate on a very simple test pipeline.
> 
> For reference, we are using Flink 1.8.0 and Apache BEAM 2.16 at the moment.
> 
> Many thanks,
> 
> Steve
> 
> 
> Stephen Hesketh
> The information classification of this email is Confidential unless otherwise stated. 
> 
> 
> -----Original Message-----
> From: Maximilian Michels [mailto:mxm@apache.org] 
> Sent: 22 April 2020 20:38
> To: user@beam.apache.org; Hesketh, Stephen (Technology, NatWest Markets)
> Subject: Re: Apache beam job on Flink checkpoint size growing over time
> 
> 
> *********************************************
> "This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished"
> *********************************************
> 
> Hi Steve,
> 
> The Flink Runner buffers data as part of the checkpoint. This was
> originally due to a limitation of Flink where we weren't able to end the
> bundle before we persisted the state for a checkpoint. This is due to
> how checkpoint barriers are emitted, I spare you the details*.
> 
> Does the data ingestion completely stop at one point? I'm asking because
> the buffer is only flushed when a new bundle is started. So you might be
> persisting data which could have already been flushed out.
> 
> Cheers,
> Max
> 
> *Since Flink version 1.7 it is actually possible to flush all bundle
> data before we send checkpoint barriers out but that may also affect
> checkpoint barrier alignment and thus we opted for keeping the buffering
> on checkpoints.
> 
> On 22.04.20 18:02, Stephen.Hesketh@natwestmarkets.com wrote:
>> One of our *Apache beam* job running through
>> the *FlinkRunner* is *experiencing an odd behaviour with checkpoint
>> size*. The state backend is File based. The job receives traffic once a
>> day for a period of an hour and then is idle until it receives more data.
>>
>>  
>>
>> The checkpoint slowly increments in size as we process more data.
>> However, the size of the checkpoint does not decrease significantly once
>> data has stopped being consumed for that day.
>>
>> We thought it could potentially be a bottle neck with the Database sink
>> however the same behaviour is present if we remove the sink and simply
>> dump the data.
>>
>> The behaviour seems to resemble a stepped graph e.g.
>>
>> ·         checkpoint = *120KB* (starting size checkpoint)
>>
>> ·         checkpoint = *409MB* (starts receiving data)
>>
>> ·         checkpoint = *850MB* (processing the backlog data)
>>
>> ·         checkpoint = *503MB* (finished processing data)
>>
>> ·         checkpoint = *1.2GB* (begins processing new data and backlog)
>>
>> ·         checkpoint = *700MB* (finished processing data)
>>
>> ·         checkpoint = *700MB* (new starting size for checkpoint)
>>
>> ·         ...
>>
>>  
>>
>> Has anyone see this behaviour before? is this a known issue with Flink
>> checkpointing using Apache beam?
>>
>> Thanks,
>>
>> Steve
>>
>>  
>>
>>  
>>
>> *Stephen Hesketh | Client Analytics Technology*
>>
>> The information classification of this email is Confidential unless
>> otherwise stated.
>>
>>  
>>
>>
>> This communication and any attachments are confidential and intended
>> solely for the addressee. If you are not the intended recipient please
>> advise us immediately and delete it. Unless specifically stated in the
>> message or otherwise indicated, you may not duplicate, redistribute or
>> forward this message and any attachments are not intended for
>> distribution to, or use by any person or entity in any jurisdiction or
>> country where such distribution or use would be contrary to local law or
>> regulation. NatWest Markets Plc  or any affiliated entity ("NatWest
>> Markets") accepts no responsibility for any changes made to this message
>> after it was sent.
>> Unless otherwise specifically indicated, the contents of this
>> communication and its attachments are for information purposes only and
>> should not be regarded as an offer or solicitation to buy or sell a
>> product or service, confirmation of any transaction, a valuation,
>> indicative price or an official statement. Trading desks may have a
>> position or interest that is inconsistent with any views expressed in
>> this message. In evaluating the information contained in this message,
>> you should know that it could have been previously provided to other
>> clients and/or internal NatWest Markets personnel, who could have
>> already acted on it.
>> NatWest Markets cannot provide absolute assurances that all electronic
>> communications (sent or received) are secure, error free, not corrupted,
>> incomplete or virus free and/or that they will not be lost,
>> mis-delivered, destroyed, delayed or intercepted/decrypted by others.
>> Therefore NatWest Markets disclaims all liability with regards to
>> electronic communications (and the contents therein) if they are
>> corrupted, lost destroyed, delayed, incomplete, mis-delivered,
>> intercepted, decrypted or otherwise misappropriated by others.
>> Any electronic communication that is conducted within or through NatWest
>> Markets systems will be subject to being archived, monitored and
>> produced to regulators and in litigation in accordance with NatWest
>> Markets’ policy and local laws, rules and regulations. Unless expressly
>> prohibited by local law, electronic communications may be archived in
>> countries other than the country in which you are located, and may be
>> treated in accordance with the laws and regulations of the country of
>> each individual included in the entire chain.
>> Copyright NatWest Markets Plc. All rights reserved. See
>> https://www.nwm.com/disclaimer for further risk disclosure.
> 
> 
> This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.
> 
> Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.
> 
> NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.
> 
> Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.
> 
> Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.
> 


This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.

Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.

NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.

Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.

Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.


This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.

Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.

NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.

Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.

Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.

RE: Apache beam job on Flink checkpoint size growing over time

Posted by St...@natwestmarkets.com.
Hi all,

This issue is linked to the fusion break we are using to distribute the events from the simple single threaded, unbounded source with no data stored in the checkpoint.

I have recreated on a simple pipeline that:

 - generates dummy events (just holding an id) from an UnboundedSource (returning Collections.singletonList(this) in the split to ensure active source on single thread)
 - uses the fusion break attached
 - has a dummy output DoFn

The checkpoint size steadily increases when running on the FlinkRunner. The pipeline options are the simple command line args below.

Any suggestions would be appreciated! Does Flink not require / like fusion breaks?

Cheers,

Steve


--runner=flink
--flinkMaster=XXXXXXXXXXXXXXXX:NNNN
--streaming=true
--parallelism=1
--checkpointingInterval=20000
--checkpointTimeoutMillis=10000
--minPauseBetweenCheckpoints=10000
--failOnCheckpointingErrors=false
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true
--numberOfExecutionRetries=2
--executionRetryDelay=1000


Stephen Hesketh
The information classification of this email is Confidential unless otherwise stated. 

-----Original Message-----
From: Hesketh, Stephen (Technology, NatWest Markets) 
Sent: 01 May 2020 15:04
To: mxm@apache.org; user@beam.apache.org
Subject: RE: Apache beam job on Flink checkpoint size growing over time


*********************************************
"This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished"
*********************************************

Hi Max,

We could test  a patched BEAM Pipeline if it might help resolve the issue.

What would be involved in getting the patched version over? (We may need to get security approval to get it down loaded.)

Many thanks,

Steve


Stephen Hesketh
The information classification of this email is Confidential unless otherwise stated. 


-----Original Message-----
From: Maximilian Michels [mailto:mxm@apache.org] 
Sent: 30 April 2020 10:17
To: Hesketh, Stephen (Technology, NatWest Markets); user@beam.apache.org
Subject: Re: Apache beam job on Flink checkpoint size growing over time


*********************************************
"This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished"
*********************************************

Hey Steve,

The checkpoint buffer is cleared "lazily" when a new bundle is started.
Theoretically, if no more data was to arrive after a checkpoint, then
this buffer would not be cleared until more data arrived. If this
repeats every time new data comes in, then always some data would remain
buffered which one could observe in the checkpoint state.

We could fix this by making sure the buffer is flushed when a checkpoint
completes, instead of merely flushing it "lazily" when new data arrives.

Do you have the option to test a patched Beam version?

Best,
Max

On 29.04.20 14:34, Stephen.Hesketh@natwestmarkets.com wrote:
> Hi Max,
> 
> The ingestion is covering EOD processing from a Kafka source, so we get a lot of data from 5pm-8pm and outside of that time we get no data. The checkpoint is just storing the Kafka offset for restart.
> 
> Sounds like during the period of no data there could be an open buffer. I would have thought that would be cleared soon after data starts flowing again though and wouldn't lead to an increase in checkpoint size over a number of days.
> 
> Unless we are missing something in BEAM and aren't actually triggering a new start bundle at any point, which is why the buffer continues to grow and is never flushed?
> 
> I am going to try to recreate on a very simple test pipeline.
> 
> For reference, we are using Flink 1.8.0 and Apache BEAM 2.16 at the moment.
> 
> Many thanks,
> 
> Steve
> 
> 
> Stephen Hesketh
> The information classification of this email is Confidential unless otherwise stated. 
> 
> 
> -----Original Message-----
> From: Maximilian Michels [mailto:mxm@apache.org] 
> Sent: 22 April 2020 20:38
> To: user@beam.apache.org; Hesketh, Stephen (Technology, NatWest Markets)
> Subject: Re: Apache beam job on Flink checkpoint size growing over time
> 
> 
> *********************************************
> "This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished"
> *********************************************
> 
> Hi Steve,
> 
> The Flink Runner buffers data as part of the checkpoint. This was
> originally due to a limitation of Flink where we weren't able to end the
> bundle before we persisted the state for a checkpoint. This is due to
> how checkpoint barriers are emitted, I spare you the details*.
> 
> Does the data ingestion completely stop at one point? I'm asking because
> the buffer is only flushed when a new bundle is started. So you might be
> persisting data which could have already been flushed out.
> 
> Cheers,
> Max
> 
> *Since Flink version 1.7 it is actually possible to flush all bundle
> data before we send checkpoint barriers out but that may also affect
> checkpoint barrier alignment and thus we opted for keeping the buffering
> on checkpoints.
> 
> On 22.04.20 18:02, Stephen.Hesketh@natwestmarkets.com wrote:
>> One of our *Apache beam* job running through
>> the *FlinkRunner* is *experiencing an odd behaviour with checkpoint
>> size*. The state backend is File based. The job receives traffic once a
>> day for a period of an hour and then is idle until it receives more data.
>>
>>  
>>
>> The checkpoint slowly increments in size as we process more data.
>> However, the size of the checkpoint does not decrease significantly once
>> data has stopped being consumed for that day.
>>
>> We thought it could potentially be a bottle neck with the Database sink
>> however the same behaviour is present if we remove the sink and simply
>> dump the data.
>>
>> The behaviour seems to resemble a stepped graph e.g.
>>
>> ·         checkpoint = *120KB* (starting size checkpoint)
>>
>> ·         checkpoint = *409MB* (starts receiving data)
>>
>> ·         checkpoint = *850MB* (processing the backlog data)
>>
>> ·         checkpoint = *503MB* (finished processing data)
>>
>> ·         checkpoint = *1.2GB* (begins processing new data and backlog)
>>
>> ·         checkpoint = *700MB* (finished processing data)
>>
>> ·         checkpoint = *700MB* (new starting size for checkpoint)
>>
>> ·         ...
>>
>>  
>>
>> Has anyone see this behaviour before? is this a known issue with Flink
>> checkpointing using Apache beam?
>>
>> Thanks,
>>
>> Steve
>>
>>  
>>
>>  
>>
>> *Stephen Hesketh | Client Analytics Technology*
>>
>> The information classification of this email is Confidential unless
>> otherwise stated.
>>
>>  
>>
>>
>> This communication and any attachments are confidential and intended
>> solely for the addressee. If you are not the intended recipient please
>> advise us immediately and delete it. Unless specifically stated in the
>> message or otherwise indicated, you may not duplicate, redistribute or
>> forward this message and any attachments are not intended for
>> distribution to, or use by any person or entity in any jurisdiction or
>> country where such distribution or use would be contrary to local law or
>> regulation. NatWest Markets Plc  or any affiliated entity ("NatWest
>> Markets") accepts no responsibility for any changes made to this message
>> after it was sent.
>> Unless otherwise specifically indicated, the contents of this
>> communication and its attachments are for information purposes only and
>> should not be regarded as an offer or solicitation to buy or sell a
>> product or service, confirmation of any transaction, a valuation,
>> indicative price or an official statement. Trading desks may have a
>> position or interest that is inconsistent with any views expressed in
>> this message. In evaluating the information contained in this message,
>> you should know that it could have been previously provided to other
>> clients and/or internal NatWest Markets personnel, who could have
>> already acted on it.
>> NatWest Markets cannot provide absolute assurances that all electronic
>> communications (sent or received) are secure, error free, not corrupted,
>> incomplete or virus free and/or that they will not be lost,
>> mis-delivered, destroyed, delayed or intercepted/decrypted by others.
>> Therefore NatWest Markets disclaims all liability with regards to
>> electronic communications (and the contents therein) if they are
>> corrupted, lost destroyed, delayed, incomplete, mis-delivered,
>> intercepted, decrypted or otherwise misappropriated by others.
>> Any electronic communication that is conducted within or through NatWest
>> Markets systems will be subject to being archived, monitored and
>> produced to regulators and in litigation in accordance with NatWest
>> Markets’ policy and local laws, rules and regulations. Unless expressly
>> prohibited by local law, electronic communications may be archived in
>> countries other than the country in which you are located, and may be
>> treated in accordance with the laws and regulations of the country of
>> each individual included in the entire chain.
>> Copyright NatWest Markets Plc. All rights reserved. See
>> https://www.nwm.com/disclaimer for further risk disclosure.
> 
> 
> This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.
> 
> Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.
> 
> NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.
> 
> Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.
> 
> Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.
> 


This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.

Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.

NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.

Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.

Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.


This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.

Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.

NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.

Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.

Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.

RE: Apache beam job on Flink checkpoint size growing over time

Posted by St...@natwestmarkets.com.
Hi Max,

We could test  a patched BEAM Pipeline if it might help resolve the issue.

What would be involved in getting the patched version over? (We may need to get security approval to get it down loaded.)

Many thanks,

Steve


Stephen Hesketh | Client Analytics Technology
S +44 (0)7968 039848
+ stephen.hesketh@natwestmarkets.co.uk 
250 Bishopsgate | London | EC2M 4AA
The information classification of this email is Confidential unless otherwise stated. 


-----Original Message-----
From: Maximilian Michels [mailto:mxm@apache.org] 
Sent: 30 April 2020 10:17
To: Hesketh, Stephen (Technology, NatWest Markets); user@beam.apache.org
Subject: Re: Apache beam job on Flink checkpoint size growing over time


*********************************************
"This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished"
*********************************************

Hey Steve,

The checkpoint buffer is cleared "lazily" when a new bundle is started.
Theoretically, if no more data was to arrive after a checkpoint, then
this buffer would not be cleared until more data arrived. If this
repeats every time new data comes in, then always some data would remain
buffered which one could observe in the checkpoint state.

We could fix this by making sure the buffer is flushed when a checkpoint
completes, instead of merely flushing it "lazily" when new data arrives.

Do you have the option to test a patched Beam version?

Best,
Max

On 29.04.20 14:34, Stephen.Hesketh@natwestmarkets.com wrote:
> Hi Max,
> 
> The ingestion is covering EOD processing from a Kafka source, so we get a lot of data from 5pm-8pm and outside of that time we get no data. The checkpoint is just storing the Kafka offset for restart.
> 
> Sounds like during the period of no data there could be an open buffer. I would have thought that would be cleared soon after data starts flowing again though and wouldn't lead to an increase in checkpoint size over a number of days.
> 
> Unless we are missing something in BEAM and aren't actually triggering a new start bundle at any point, which is why the buffer continues to grow and is never flushed?
> 
> I am going to try to recreate on a very simple test pipeline.
> 
> For reference, we are using Flink 1.8.0 and Apache BEAM 2.16 at the moment.
> 
> Many thanks,
> 
> Steve
> 
> 
> Stephen Hesketh | Client Analytics Technology
> S +44 (0)7968 039848
> + stephen.hesketh@natwestmarkets.co.uk 
> 250 Bishopsgate | London | EC2M 4AA
> The information classification of this email is Confidential unless otherwise stated. 
> 
> 
> -----Original Message-----
> From: Maximilian Michels [mailto:mxm@apache.org] 
> Sent: 22 April 2020 20:38
> To: user@beam.apache.org; Hesketh, Stephen (Technology, NatWest Markets)
> Subject: Re: Apache beam job on Flink checkpoint size growing over time
> 
> 
> *********************************************
> "This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished"
> *********************************************
> 
> Hi Steve,
> 
> The Flink Runner buffers data as part of the checkpoint. This was
> originally due to a limitation of Flink where we weren't able to end the
> bundle before we persisted the state for a checkpoint. This is due to
> how checkpoint barriers are emitted, I spare you the details*.
> 
> Does the data ingestion completely stop at one point? I'm asking because
> the buffer is only flushed when a new bundle is started. So you might be
> persisting data which could have already been flushed out.
> 
> Cheers,
> Max
> 
> *Since Flink version 1.7 it is actually possible to flush all bundle
> data before we send checkpoint barriers out but that may also affect
> checkpoint barrier alignment and thus we opted for keeping the buffering
> on checkpoints.
> 
> On 22.04.20 18:02, Stephen.Hesketh@natwestmarkets.com wrote:
>> One of our *Apache beam* job running through
>> the *FlinkRunner* is *experiencing an odd behaviour with checkpoint
>> size*. The state backend is File based. The job receives traffic once a
>> day for a period of an hour and then is idle until it receives more data.
>>
>>  
>>
>> The checkpoint slowly increments in size as we process more data.
>> However, the size of the checkpoint does not decrease significantly once
>> data has stopped being consumed for that day.
>>
>> We thought it could potentially be a bottle neck with the Database sink
>> however the same behaviour is present if we remove the sink and simply
>> dump the data.
>>
>> The behaviour seems to resemble a stepped graph e.g.
>>
>> ·         checkpoint = *120KB* (starting size checkpoint)
>>
>> ·         checkpoint = *409MB* (starts receiving data)
>>
>> ·         checkpoint = *850MB* (processing the backlog data)
>>
>> ·         checkpoint = *503MB* (finished processing data)
>>
>> ·         checkpoint = *1.2GB* (begins processing new data and backlog)
>>
>> ·         checkpoint = *700MB* (finished processing data)
>>
>> ·         checkpoint = *700MB* (new starting size for checkpoint)
>>
>> ·         ...
>>
>>  
>>
>> Has anyone see this behaviour before? is this a known issue with Flink
>> checkpointing using Apache beam?
>>
>> Thanks,
>>
>> Steve
>>
>>  
>>
>>  
>>
>> *Stephen Hesketh | Client Analytics Technology*
>>
>> The information classification of this email is Confidential unless
>> otherwise stated.
>>
>>  
>>
>>
>> This communication and any attachments are confidential and intended
>> solely for the addressee. If you are not the intended recipient please
>> advise us immediately and delete it. Unless specifically stated in the
>> message or otherwise indicated, you may not duplicate, redistribute or
>> forward this message and any attachments are not intended for
>> distribution to, or use by any person or entity in any jurisdiction or
>> country where such distribution or use would be contrary to local law or
>> regulation. NatWest Markets Plc  or any affiliated entity ("NatWest
>> Markets") accepts no responsibility for any changes made to this message
>> after it was sent.
>> Unless otherwise specifically indicated, the contents of this
>> communication and its attachments are for information purposes only and
>> should not be regarded as an offer or solicitation to buy or sell a
>> product or service, confirmation of any transaction, a valuation,
>> indicative price or an official statement. Trading desks may have a
>> position or interest that is inconsistent with any views expressed in
>> this message. In evaluating the information contained in this message,
>> you should know that it could have been previously provided to other
>> clients and/or internal NatWest Markets personnel, who could have
>> already acted on it.
>> NatWest Markets cannot provide absolute assurances that all electronic
>> communications (sent or received) are secure, error free, not corrupted,
>> incomplete or virus free and/or that they will not be lost,
>> mis-delivered, destroyed, delayed or intercepted/decrypted by others.
>> Therefore NatWest Markets disclaims all liability with regards to
>> electronic communications (and the contents therein) if they are
>> corrupted, lost destroyed, delayed, incomplete, mis-delivered,
>> intercepted, decrypted or otherwise misappropriated by others.
>> Any electronic communication that is conducted within or through NatWest
>> Markets systems will be subject to being archived, monitored and
>> produced to regulators and in litigation in accordance with NatWest
>> Markets’ policy and local laws, rules and regulations. Unless expressly
>> prohibited by local law, electronic communications may be archived in
>> countries other than the country in which you are located, and may be
>> treated in accordance with the laws and regulations of the country of
>> each individual included in the entire chain.
>> Copyright NatWest Markets Plc. All rights reserved. See
>> https://www.nwm.com/disclaimer for further risk disclosure.
> 
> 
> This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.
> 
> Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.
> 
> NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.
> 
> Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.
> 
> Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.
> 


This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.

Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.

NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.

Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.

Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.

Re: Apache beam job on Flink checkpoint size growing over time

Posted by Maximilian Michels <mx...@apache.org>.
Hey Steve,

The checkpoint buffer is cleared "lazily" when a new bundle is started.
Theoretically, if no more data was to arrive after a checkpoint, then
this buffer would not be cleared until more data arrived. If this
repeats every time new data comes in, then always some data would remain
buffered which one could observe in the checkpoint state.

We could fix this by making sure the buffer is flushed when a checkpoint
completes, instead of merely flushing it "lazily" when new data arrives.

Do you have the option to test a patched Beam version?

Best,
Max

On 29.04.20 14:34, Stephen.Hesketh@natwestmarkets.com wrote:
> Hi Max,
> 
> The ingestion is covering EOD processing from a Kafka source, so we get a lot of data from 5pm-8pm and outside of that time we get no data. The checkpoint is just storing the Kafka offset for restart.
> 
> Sounds like during the period of no data there could be an open buffer. I would have thought that would be cleared soon after data starts flowing again though and wouldn't lead to an increase in checkpoint size over a number of days.
> 
> Unless we are missing something in BEAM and aren't actually triggering a new start bundle at any point, which is why the buffer continues to grow and is never flushed?
> 
> I am going to try to recreate on a very simple test pipeline.
> 
> For reference, we are using Flink 1.8.0 and Apache BEAM 2.16 at the moment.
> 
> Many thanks,
> 
> Steve
> 
> 
> Stephen Hesketh | Client Analytics Technology
> S +44 (0)7968 039848
> + stephen.hesketh@natwestmarkets.co.uk 
> 250 Bishopsgate | London | EC2M 4AA
> The information classification of this email is Confidential unless otherwise stated. 
> 
> 
> -----Original Message-----
> From: Maximilian Michels [mailto:mxm@apache.org] 
> Sent: 22 April 2020 20:38
> To: user@beam.apache.org; Hesketh, Stephen (Technology, NatWest Markets)
> Subject: Re: Apache beam job on Flink checkpoint size growing over time
> 
> 
> *********************************************
> "This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished"
> *********************************************
> 
> Hi Steve,
> 
> The Flink Runner buffers data as part of the checkpoint. This was
> originally due to a limitation of Flink where we weren't able to end the
> bundle before we persisted the state for a checkpoint. This is due to
> how checkpoint barriers are emitted, I spare you the details*.
> 
> Does the data ingestion completely stop at one point? I'm asking because
> the buffer is only flushed when a new bundle is started. So you might be
> persisting data which could have already been flushed out.
> 
> Cheers,
> Max
> 
> *Since Flink version 1.7 it is actually possible to flush all bundle
> data before we send checkpoint barriers out but that may also affect
> checkpoint barrier alignment and thus we opted for keeping the buffering
> on checkpoints.
> 
> On 22.04.20 18:02, Stephen.Hesketh@natwestmarkets.com wrote:
>> One of our *Apache beam* job running through
>> the *FlinkRunner* is *experiencing an odd behaviour with checkpoint
>> size*. The state backend is File based. The job receives traffic once a
>> day for a period of an hour and then is idle until it receives more data.
>>
>>  
>>
>> The checkpoint slowly increments in size as we process more data.
>> However, the size of the checkpoint does not decrease significantly once
>> data has stopped being consumed for that day.
>>
>> We thought it could potentially be a bottle neck with the Database sink
>> however the same behaviour is present if we remove the sink and simply
>> dump the data.
>>
>> The behaviour seems to resemble a stepped graph e.g.
>>
>> ·         checkpoint = *120KB* (starting size checkpoint)
>>
>> ·         checkpoint = *409MB* (starts receiving data)
>>
>> ·         checkpoint = *850MB* (processing the backlog data)
>>
>> ·         checkpoint = *503MB* (finished processing data)
>>
>> ·         checkpoint = *1.2GB* (begins processing new data and backlog)
>>
>> ·         checkpoint = *700MB* (finished processing data)
>>
>> ·         checkpoint = *700MB* (new starting size for checkpoint)
>>
>> ·         ...
>>
>>  
>>
>> Has anyone see this behaviour before? is this a known issue with Flink
>> checkpointing using Apache beam?
>>
>> Thanks,
>>
>> Steve
>>
>>  
>>
>>  
>>
>> *Stephen Hesketh | Client Analytics Technology*
>>
>> The information classification of this email is Confidential unless
>> otherwise stated.
>>
>>  
>>
>>
>> This communication and any attachments are confidential and intended
>> solely for the addressee. If you are not the intended recipient please
>> advise us immediately and delete it. Unless specifically stated in the
>> message or otherwise indicated, you may not duplicate, redistribute or
>> forward this message and any attachments are not intended for
>> distribution to, or use by any person or entity in any jurisdiction or
>> country where such distribution or use would be contrary to local law or
>> regulation. NatWest Markets Plc  or any affiliated entity ("NatWest
>> Markets") accepts no responsibility for any changes made to this message
>> after it was sent.
>> Unless otherwise specifically indicated, the contents of this
>> communication and its attachments are for information purposes only and
>> should not be regarded as an offer or solicitation to buy or sell a
>> product or service, confirmation of any transaction, a valuation,
>> indicative price or an official statement. Trading desks may have a
>> position or interest that is inconsistent with any views expressed in
>> this message. In evaluating the information contained in this message,
>> you should know that it could have been previously provided to other
>> clients and/or internal NatWest Markets personnel, who could have
>> already acted on it.
>> NatWest Markets cannot provide absolute assurances that all electronic
>> communications (sent or received) are secure, error free, not corrupted,
>> incomplete or virus free and/or that they will not be lost,
>> mis-delivered, destroyed, delayed or intercepted/decrypted by others.
>> Therefore NatWest Markets disclaims all liability with regards to
>> electronic communications (and the contents therein) if they are
>> corrupted, lost destroyed, delayed, incomplete, mis-delivered,
>> intercepted, decrypted or otherwise misappropriated by others.
>> Any electronic communication that is conducted within or through NatWest
>> Markets systems will be subject to being archived, monitored and
>> produced to regulators and in litigation in accordance with NatWest
>> Markets’ policy and local laws, rules and regulations. Unless expressly
>> prohibited by local law, electronic communications may be archived in
>> countries other than the country in which you are located, and may be
>> treated in accordance with the laws and regulations of the country of
>> each individual included in the entire chain.
>> Copyright NatWest Markets Plc. All rights reserved. See
>> https://www.nwm.com/disclaimer for further risk disclosure.
> 
> 
> This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.
> 
> Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.
> 
> NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.
> 
> Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.
> 
> Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.
> 

RE: Apache beam job on Flink checkpoint size growing over time

Posted by St...@natwestmarkets.com.
Hi Max,

The ingestion is covering EOD processing from a Kafka source, so we get a lot of data from 5pm-8pm and outside of that time we get no data. The checkpoint is just storing the Kafka offset for restart.

Sounds like during the period of no data there could be an open buffer. I would have thought that would be cleared soon after data starts flowing again though and wouldn't lead to an increase in checkpoint size over a number of days.

Unless we are missing something in BEAM and aren't actually triggering a new start bundle at any point, which is why the buffer continues to grow and is never flushed?

I am going to try to recreate on a very simple test pipeline.

For reference, we are using Flink 1.8.0 and Apache BEAM 2.16 at the moment.

Many thanks,

Steve


Stephen Hesketh | Client Analytics Technology
S +44 (0)7968 039848
+ stephen.hesketh@natwestmarkets.co.uk 
250 Bishopsgate | London | EC2M 4AA
The information classification of this email is Confidential unless otherwise stated. 


-----Original Message-----
From: Maximilian Michels [mailto:mxm@apache.org] 
Sent: 22 April 2020 20:38
To: user@beam.apache.org; Hesketh, Stephen (Technology, NatWest Markets)
Subject: Re: Apache beam job on Flink checkpoint size growing over time


*********************************************
"This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished"
*********************************************

Hi Steve,

The Flink Runner buffers data as part of the checkpoint. This was
originally due to a limitation of Flink where we weren't able to end the
bundle before we persisted the state for a checkpoint. This is due to
how checkpoint barriers are emitted, I spare you the details*.

Does the data ingestion completely stop at one point? I'm asking because
the buffer is only flushed when a new bundle is started. So you might be
persisting data which could have already been flushed out.

Cheers,
Max

*Since Flink version 1.7 it is actually possible to flush all bundle
data before we send checkpoint barriers out but that may also affect
checkpoint barrier alignment and thus we opted for keeping the buffering
on checkpoints.

On 22.04.20 18:02, Stephen.Hesketh@natwestmarkets.com wrote:
> One of our *Apache beam* job running through
> the *FlinkRunner* is *experiencing an odd behaviour with checkpoint
> size*. The state backend is File based. The job receives traffic once a
> day for a period of an hour and then is idle until it receives more data.
> 
>  
> 
> The checkpoint slowly increments in size as we process more data.
> However, the size of the checkpoint does not decrease significantly once
> data has stopped being consumed for that day.
> 
> We thought it could potentially be a bottle neck with the Database sink
> however the same behaviour is present if we remove the sink and simply
> dump the data.
> 
> The behaviour seems to resemble a stepped graph e.g.
> 
> ·         checkpoint = *120KB* (starting size checkpoint)
> 
> ·         checkpoint = *409MB* (starts receiving data)
> 
> ·         checkpoint = *850MB* (processing the backlog data)
> 
> ·         checkpoint = *503MB* (finished processing data)
> 
> ·         checkpoint = *1.2GB* (begins processing new data and backlog)
> 
> ·         checkpoint = *700MB* (finished processing data)
> 
> ·         checkpoint = *700MB* (new starting size for checkpoint)
> 
> ·         ...
> 
>  
> 
> Has anyone see this behaviour before? is this a known issue with Flink
> checkpointing using Apache beam?
> 
> Thanks,
> 
> Steve
> 
>  
> 
>  
> 
> *Stephen Hesketh | Client Analytics Technology*
> 
> The information classification of this email is Confidential unless
> otherwise stated.
> 
>  
> 
> 
> This communication and any attachments are confidential and intended
> solely for the addressee. If you are not the intended recipient please
> advise us immediately and delete it. Unless specifically stated in the
> message or otherwise indicated, you may not duplicate, redistribute or
> forward this message and any attachments are not intended for
> distribution to, or use by any person or entity in any jurisdiction or
> country where such distribution or use would be contrary to local law or
> regulation. NatWest Markets Plc  or any affiliated entity ("NatWest
> Markets") accepts no responsibility for any changes made to this message
> after it was sent.
> Unless otherwise specifically indicated, the contents of this
> communication and its attachments are for information purposes only and
> should not be regarded as an offer or solicitation to buy or sell a
> product or service, confirmation of any transaction, a valuation,
> indicative price or an official statement. Trading desks may have a
> position or interest that is inconsistent with any views expressed in
> this message. In evaluating the information contained in this message,
> you should know that it could have been previously provided to other
> clients and/or internal NatWest Markets personnel, who could have
> already acted on it.
> NatWest Markets cannot provide absolute assurances that all electronic
> communications (sent or received) are secure, error free, not corrupted,
> incomplete or virus free and/or that they will not be lost,
> mis-delivered, destroyed, delayed or intercepted/decrypted by others.
> Therefore NatWest Markets disclaims all liability with regards to
> electronic communications (and the contents therein) if they are
> corrupted, lost destroyed, delayed, incomplete, mis-delivered,
> intercepted, decrypted or otherwise misappropriated by others.
> Any electronic communication that is conducted within or through NatWest
> Markets systems will be subject to being archived, monitored and
> produced to regulators and in litigation in accordance with NatWest
> Markets’ policy and local laws, rules and regulations. Unless expressly
> prohibited by local law, electronic communications may be archived in
> countries other than the country in which you are located, and may be
> treated in accordance with the laws and regulations of the country of
> each individual included in the entire chain.
> Copyright NatWest Markets Plc. All rights reserved. See
> https://www.nwm.com/disclaimer for further risk disclosure.


This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc  or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent.

Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it.

NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others.

Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain.

Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.

Re: Apache beam job on Flink checkpoint size growing over time

Posted by Maximilian Michels <mx...@apache.org>.
Hi Steve,

The Flink Runner buffers data as part of the checkpoint. This was
originally due to a limitation of Flink where we weren't able to end the
bundle before we persisted the state for a checkpoint. This is due to
how checkpoint barriers are emitted, I spare you the details*.

Does the data ingestion completely stop at one point? I'm asking because
the buffer is only flushed when a new bundle is started. So you might be
persisting data which could have already been flushed out.

Cheers,
Max

*Since Flink version 1.7 it is actually possible to flush all bundle
data before we send checkpoint barriers out but that may also affect
checkpoint barrier alignment and thus we opted for keeping the buffering
on checkpoints.

On 22.04.20 18:02, Stephen.Hesketh@natwestmarkets.com wrote:
> One of our *Apache beam* job running through
> the *FlinkRunner* is *experiencing an odd behaviour with checkpoint
> size*. The state backend is File based. The job receives traffic once a
> day for a period of an hour and then is idle until it receives more data.
> 
>  
> 
> The checkpoint slowly increments in size as we process more data.
> However, the size of the checkpoint does not decrease significantly once
> data has stopped being consumed for that day.
> 
> We thought it could potentially be a bottle neck with the Database sink
> however the same behaviour is present if we remove the sink and simply
> dump the data.
> 
> The behaviour seems to resemble a stepped graph e.g.
> 
> ·         checkpoint = *120KB* (starting size checkpoint)
> 
> ·         checkpoint = *409MB* (starts receiving data)
> 
> ·         checkpoint = *850MB* (processing the backlog data)
> 
> ·         checkpoint = *503MB* (finished processing data)
> 
> ·         checkpoint = *1.2GB* (begins processing new data and backlog)
> 
> ·         checkpoint = *700MB* (finished processing data)
> 
> ·         checkpoint = *700MB* (new starting size for checkpoint)
> 
> ·         ...
> 
>  
> 
> Has anyone see this behaviour before? is this a known issue with Flink
> checkpointing using Apache beam?
> 
> Thanks,
> 
> Steve
> 
>  
> 
>  
> 
> *Stephen Hesketh | Client Analytics Technology*
> 
> The information classification of this email is Confidential unless
> otherwise stated.
> 
>  
> 
> 
> This communication and any attachments are confidential and intended
> solely for the addressee. If you are not the intended recipient please
> advise us immediately and delete it. Unless specifically stated in the
> message or otherwise indicated, you may not duplicate, redistribute or
> forward this message and any attachments are not intended for
> distribution to, or use by any person or entity in any jurisdiction or
> country where such distribution or use would be contrary to local law or
> regulation. NatWest Markets Plc  or any affiliated entity ("NatWest
> Markets") accepts no responsibility for any changes made to this message
> after it was sent.
> Unless otherwise specifically indicated, the contents of this
> communication and its attachments are for information purposes only and
> should not be regarded as an offer or solicitation to buy or sell a
> product or service, confirmation of any transaction, a valuation,
> indicative price or an official statement. Trading desks may have a
> position or interest that is inconsistent with any views expressed in
> this message. In evaluating the information contained in this message,
> you should know that it could have been previously provided to other
> clients and/or internal NatWest Markets personnel, who could have
> already acted on it.
> NatWest Markets cannot provide absolute assurances that all electronic
> communications (sent or received) are secure, error free, not corrupted,
> incomplete or virus free and/or that they will not be lost,
> mis-delivered, destroyed, delayed or intercepted/decrypted by others.
> Therefore NatWest Markets disclaims all liability with regards to
> electronic communications (and the contents therein) if they are
> corrupted, lost destroyed, delayed, incomplete, mis-delivered,
> intercepted, decrypted or otherwise misappropriated by others.
> Any electronic communication that is conducted within or through NatWest
> Markets systems will be subject to being archived, monitored and
> produced to regulators and in litigation in accordance with NatWest
> Markets’ policy and local laws, rules and regulations. Unless expressly
> prohibited by local law, electronic communications may be archived in
> countries other than the country in which you are located, and may be
> treated in accordance with the laws and regulations of the country of
> each individual included in the entire chain.
> Copyright NatWest Markets Plc. All rights reserved. See
> https://www.nwm.com/disclaimer for further risk disclosure.