You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by zhijiang <wa...@aliyun.com> on 2019/03/01 03:58:41 UTC

Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

Thanks for the further feedbacks!

For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint expires because of timeout, the next new triggered checkpoint might still be failed for timeout. So it seems better to wait the current checkpoint until finishes, not expires it, unless we can not bear this long time for some reasons such as wondering failover to restore more data during this time.

For option2: The default network setting should be make sense. The lower values might cause performance regression and the higher values would increase the inflighing buffers and checkpoint delay more seriously.

For option3: If the resource is limited, it is still not working on your side.

It is an option and might work in your case for sleeping some time in source as you mentioned, although it seems not a graceful way.

I think there are no data skew in your case to cause backpressure, because you used the rebalance mode as mentioned. Another option might use the forward mode which would be better than rebalance mode if possible in your case. Because the source and downstream task is one-to-one in forward mode, so the total flighting buffers are 2+2+8 for one single downstream task before barrier. If in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of source vertex), because it is all-to-all connection. The barrier alignment takes more time in rebalance mode than forward mode.

Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>
Send Time:2019年3月1日(星期五) 00:46
To:zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)


Update :
Option  1 does not work. It still fails at the end of the timeout, no matter its value.
Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ? 
De : LINZ, Arnaud 
Envoyé : jeudi 28 février 2019 15:47
À : 'zhijiang' <wa...@aliyun.com>; user <us...@flink.apache.org>
Objet : RE: Checkpoints and catch-up burst (heavy back pressure)
Hi Zhihiang,
Thanks for your feedback.
I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large.
Option 3 won’t work ; I already take too many ressources, and as my source is more or less a hdfs directory listing, it will always be far faster than any mapper that reads the file and emits records based on its content or sink that store the transformed data, unless I put “sleeps” in it (but is this really a good idea?)
Option 2: taskmanager.network.memory.buffers-per-channel and taskmanager.network.memory.buffers-per-gate are currently unset in my configuration (so to their default of 2 and 8), but for this streaming app I have very few exchanges between nodes (just a rebalance after the source that emit file names, everything else is local to the node). Should I adjust their values nonetheless ? To higher or lower values ?
Best,
Arnaud
De : zhijiang <wa...@aliyun.com> 
Envoyé : jeudi 28 février 2019 10:58
À : user <us...@flink.apache.org>; LINZ, Arnaud <AL...@bouyguestelecom.fr>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud,
I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock. 
Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to trigger checkpoint and this would take some time under back pressure.
There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting buffers before barrier, you can check the config of  "taskmanager.network.memory.buffers-per-channel" and "taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to process source data faster, to avoid backpressure in some extent.
You could check which way is suitable for your scenario and may have a try.
Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>
Send Time:2019年2月28日(星期四) 17:28
To:user <us...@flink.apache.org>
Subject:Checkpoints and catch-up burst (heavy back pressure)
Hello,
I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”.
Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a while and then restart it, we have a catch-up burst to get all the messages emitted in the meanwhile.
During this burst, the source is faster than the sink, and all checkpoints fail (time out) until the source has been totally caught up. This is annoying because the sink does not “commit” the data before a successful checkpoint is made, and so the app release all the “catch up” data as a atomic block that can be huge if the streaming app was stopped for a while, adding an unwanted stress to all the following hive treatments that use the data provided in micro batches and to the Hadoop cluster.
How should I handle the situation? Is there something special to do to get checkpoints even during heavy load?
The problem does not seem to be new, but I was unable to find any practical solution in the documentation.
Best regards,
Arnaud


 L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

 The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.


Re: Checkpoints and catch-up burst (heavy back pressure)

Posted by Ken Krugler <kk...@transpac.com>.
The amount of data you’re checkpointing (if it’s really limited to file lists) still seems too small to cause timeouts, unless there’s some other issue with either your configuration or where data is being written (thus my previous question #1).

— Ken


> On Mar 3, 2019, at 10:56 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr> wrote:
> 
> Hi,
> 
> My source checkpoint is actually the file list. But it's not trivially small as I may have hundreds of thousand of files, with long filenames.
> My sink checkpoint is a smaller hdfs file list with current size.
> 
> -------- Message d'origine --------
> De : Ken Krugler <kk...@transpac.com>
> Date : ven., mars 01, 2019 7:05 PM +0100
> A : "LINZ, Arnaud" <AL...@bouyguestelecom.fr>
> CC : zhijiang <wa...@aliyun.com>, user <us...@flink.apache.org>
> Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
> 
> Hi Arnaud,
> 
> 1. What’s your checkpoint configuration? Wondering if you’re writing to HDFS, and thus the load you’re putting on it while catching up & checkpointing is too high.
> 
> If so, then you could monitor the TotalLoad metric (FSNamesystem) in your source, and throttle back the emitting of file paths when this (empirically) gets too high.
> 
> 2. I’m wondering what all you are checkpointing, and why.
> 
> E.g. if this is just an ETL-ish workflow to pull files, parse them, and write out (transformed) results, then you could in theory just checkpoint which files have been processed.
> 
> This means catching up after a failure could take more time, but your checkpoint size will be trivially small.
> 
> — Ken
> 
> 
>> On Mar 1, 2019, at 5:04 AM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr <ma...@bouyguestelecom.fr>> wrote:
>> 
>> Hi,
>>  
>> I think I should go into more details to explain my use case.
>> I have one non parallel source (parallelism = 1) that list binary files in a HDFS directory. DataSet emitted by the source is a data set of file names, not file content. These filenames are rebalanced, and sent to workers (parallelism = 15) that will use a flatmapper that open the file, read it, decode it, and send records (forward mode) to the sinks (with a few 1-to-1 mapping in-between). So the flatmap operation is a time-consuming one as the files are more than 200Mb large each; the flatmapper will emit millions of record to the sink given one source record (filename).
>>  
>> The rebalancing, occurring at the file name level, does not use much I/O and I cannot use one-to-one mode at that point if I want some parallelims since I have only one source.
>>  
>> I did not put file decoding directly in the sources because I have no good way to distribute files to sources without a controller (input directory is unique, filenames are random and cannot be “attributed” to one particular source instance easily). 
>> Alternatively, I could have used a dispatcher daemon separated from the streaming app that distribute files to various directories, each directory being associated with a flink source instance, and put the file reading & decoding directly in the source, but that seemed more complex to code and exploit than the filename source. Would it have been better from the checkpointing perspective?
>>  
>> About the ungraceful source sleep(), is there a way, programmatically, to know the “load” of the app, or to determine if checkpointing takes too much time, so that I can do it only on purpose?
>>  
>> Thanks,
>> Arnaud
>>  
>> De : zhijiang <wangzhijiang999@aliyun.com <ma...@aliyun.com>> 
>> Envoyé : vendredi 1 mars 2019 04:59
>> À : user <user@flink.apache.org <ma...@flink.apache.org>>; LINZ, Arnaud <ALINZ@bouyguestelecom.fr <ma...@bouyguestelecom.fr>>
>> Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
>>  
>> Hi Arnaud,
>>  
>> Thanks for the further feedbacks!
>>  
>> For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint expires because of timeout, the next new triggered checkpoint might still be failed for timeout. So it seems better to wait the current checkpoint until finishes, not expires it, unless we can not bear this long time for some reasons such as wondering failover to restore more data during this time.
>>  
>> For option2: The default network setting should be make sense. The lower values might cause performance regression and the higher values would increase the inflighing buffers and checkpoint delay more seriously.
>>  
>> For option3: If the resource is limited, it is still not working on your side.
>>  
>> It is an option and might work in your case for sleeping some time in source as you mentioned, although it seems not a graceful way.
>>  
>> I think there are no data skew in your case to cause backpressure, because you used the rebalance mode as mentioned. Another option might use the forward mode which would be better than rebalance mode if possible in your case. Because the source and downstream task is one-to-one in forward mode, so the total flighting buffers are 2+2+8 for one single downstream task before barrier. If in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of source vertex), because it is all-to-all connection. The barrier alignment takes more time in rebalance mode than forward mode.
>>  
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> From:LINZ, Arnaud <ALINZ@bouyguestelecom.fr <ma...@bouyguestelecom.fr>>
>> Send Time:2019年3月1日(星期五) 00:46
>> To:zhijiang <wangzhijiang999@aliyun.com <ma...@aliyun.com>>; user <user@flink.apache.org <ma...@flink.apache.org>>
>> Subject:RE: Checkpoints and catch-up burst (heavy back pressure)
>>  
>> Update :
>> Option  1 does not work. It still fails at the end of the timeout, no matter its value.
>> Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ?
>>  
>> De : LINZ, Arnaud 
>> Envoyé : jeudi 28 février 2019 15:47
>> À : 'zhijiang' <wangzhijiang999@aliyun.com <ma...@aliyun.com>>; user <user@flink.apache.org <ma...@flink.apache.org>>
>> Objet : RE: Checkpoints and catch-up burst (heavy back pressure)
>>  
>> Hi Zhihiang,
>>  
>> Thanks for your feedback.
>> I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large.
>> Option 3 won’t work ; I already take too many ressources, and as my source is more or less a hdfs directory listing, it will always be far faster than any mapper that reads the file and emits records based on its content or sink that store the transformed data, unless I put “sleeps” in it (but is this really a good idea?)
>> Option 2: taskmanager.network.memory.buffers-per-channel and taskmanager.network.memory.buffers-per-gate are currently unset in my configuration (so to their default of 2 and 8), but for this streaming app I have very few exchanges between nodes (just a rebalance after the source that emit file names, everything else is local to the node). Should I adjust their values nonetheless ? To higher or lower values ?
>> Best,
>> Arnaud
>> De : zhijiang <wangzhijiang999@aliyun.com <ma...@aliyun.com>> 
>> Envoyé : jeudi 28 février 2019 10:58
>> À : user <user@flink.apache.org <ma...@flink.apache.org>>; LINZ, Arnaud <ALINZ@bouyguestelecom.fr <ma...@bouyguestelecom.fr>>
>> Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
>>  
>> Hi Arnaud,
>>  
>> I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock.
>> Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to trigger checkpoint and this would take some time under back pressure.
>>  
>> There has three ways to work around:
>> 1. Increase the checkpoint timeout avoid expire in short time.
>> 2. Decrease the setting of network buffers to decrease the amount of flighting buffers before barrier, you can check the config of  "taskmanager.network.memory.buffers-per-channel" and "taskmanager.network.memory.buffers-per-gate".
>> 3. Adjust the parallelism such as increasing it for sink vertex in order to process source data faster, to avoid backpressure in some extent.
>>  
>> You could check which way is suitable for your scenario and may have a try.
>>  
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> From:LINZ, Arnaud <ALINZ@bouyguestelecom.fr <ma...@bouyguestelecom.fr>>
>> Send Time:2019年2月28日(星期四) 17:28
>> To:user <user@flink.apache.org <ma...@flink.apache.org>>
>> Subject:Checkpoints and catch-up burst (heavy back pressure)
>>  
>> Hello,
>>  
>> I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”.
>> Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a while and then restart it, we have a catch-up burst to get all the messages emitted in the meanwhile.
>> During this burst, the source is faster than the sink, and all checkpoints fail (time out) until the source has been totally caught up. This is annoying because the sink does not “commit” the data before a successful checkpoint is made, and so the app release all the “catch up” data as a atomic block that can be huge if the streaming app was stopped for a while, adding an unwanted stress to all the following hive treatments that use the data provided in micro batches and to the Hadoop cluster.
>>  
>> How should I handle the situation? Is there something special to do to get checkpoints even during heavy load?
>>  
>> The problem does not seem to be new, but I was unable to find any practical solution in the documentation.
>>  
>> Best regards,
>> Arnaud
>>  
>>  
>>  
>>  
>>  
>> 
>> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>> 
>> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Re: Checkpoints and catch-up burst (heavy back pressure)

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,

My source checkpoint is actually the file list. But it's not trivially small as I may have hundreds of thousand of files, with long filenames.
My sink checkpoint is a smaller hdfs file list with current size.

-------- Message d'origine --------
De : Ken Krugler <kk...@transpac.com>
Date : ven., mars 01, 2019 7:05 PM +0100
A : "LINZ, Arnaud" <AL...@bouyguestelecom.fr>
CC : zhijiang <wa...@aliyun.com>, user <us...@flink.apache.org>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

1. What’s your checkpoint configuration? Wondering if you’re writing to HDFS, and thus the load you’re putting on it while catching up & checkpointing is too high.

If so, then you could monitor the TotalLoad metric (FSNamesystem) in your source, and throttle back the emitting of file paths when this (empirically) gets too high.

2. I’m wondering what all you are checkpointing, and why.

E.g. if this is just an ETL-ish workflow to pull files, parse them, and write out (transformed) results, then you could in theory just checkpoint which files have been processed.

This means catching up after a failure could take more time, but your checkpoint size will be trivially small.

— Ken


On Mar 1, 2019, at 5:04 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:

Hi,

I think I should go into more details to explain my use case.
I have one non parallel source (parallelism = 1) that list binary files in a HDFS directory. DataSet emitted by the source is a data set of file names, not file content. These filenames are rebalanced, and sent to workers (parallelism = 15) that will use a flatmapper that open the file, read it, decode it, and send records (forward mode) to the sinks (with a few 1-to-1 mapping in-between). So the flatmap operation is a time-consuming one as the files are more than 200Mb large each; the flatmapper will emit millions of record to the sink given one source record (filename).

The rebalancing, occurring at the file name level, does not use much I/O and I cannot use one-to-one mode at that point if I want some parallelims since I have only one source.

I did not put file decoding directly in the sources because I have no good way to distribute files to sources without a controller (input directory is unique, filenames are random and cannot be “attributed” to one particular source instance easily).
Alternatively, I could have used a dispatcher daemon separated from the streaming app that distribute files to various directories, each directory being associated with a flink source instance, and put the file reading & decoding directly in the source, but that seemed more complex to code and exploit than the filename source. Would it have been better from the checkpointing perspective?

About the ungraceful source sleep(), is there a way, programmatically, to know the “load” of the app, or to determine if checkpointing takes too much time, so that I can do it only on purpose?

Thanks,
Arnaud

De : zhijiang <wa...@aliyun.com>>
Envoyé : vendredi 1 mars 2019 04:59
À : user <us...@flink.apache.org>>; LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

Thanks for the further feedbacks!

For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint expires because of timeout, the next new triggered checkpoint might still be failed for timeout. So it seems better to wait the current checkpoint until finishes, not expires it, unless we can not bear this long time for some reasons such as wondering failover to restore more data during this time.

For option2: The default network setting should be make sense. The lower values might cause performance regression and the higher values would increase the inflighing buffers and checkpoint delay more seriously.

For option3: If the resource is limited, it is still not working on your side.

It is an option and might work in your case for sleeping some time in source as you mentioned, although it seems not a graceful way.

I think there are no data skew in your case to cause backpressure, because you used the rebalance mode as mentioned. Another option might use the forward mode which would be better than rebalance mode if possible in your case. Because the source and downstream task is one-to-one in forward mode, so the total flighting buffers are 2+2+8 for one single downstream task before barrier. If in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of source vertex), because it is all-to-all connection. The barrier alignment takes more time in rebalance mode than forward mode.

Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Send Time:2019年3月1日(星期五) 00:46
To:zhijiang <wa...@aliyun.com>>; user <us...@flink.apache.org>>
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)

Update :
Option  1 does not work. It still fails at the end of the timeout, no matter its value.
Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ?

De : LINZ, Arnaud
Envoyé : jeudi 28 février 2019 15:47
À : 'zhijiang' <wa...@aliyun.com>>; user <us...@flink.apache.org>>
Objet : RE: Checkpoints and catch-up burst (heavy back pressure)

Hi Zhihiang,

Thanks for your feedback.

  *   I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large.
  *   Option 3 won’t work ; I already take too many ressources, and as my source is more or less a hdfs directory listing, it will always be far faster than any mapper that reads the file and emits records based on its content or sink that store the transformed data, unless I put “sleeps” in it (but is this really a good idea?)
  *   Option 2: taskmanager.network.memory.buffers-per-channel and taskmanager.network.memory.buffers-per-gate are currently unset in my configuration (so to their default of 2 and 8), but for this streaming app I have very few exchanges between nodes (just a rebalance after the source that emit file names, everything else is local to the node). Should I adjust their values nonetheless ? To higher or lower values ?

Best,
Arnaud
De : zhijiang <wa...@aliyun.com>>
Envoyé : jeudi 28 février 2019 10:58
À : user <us...@flink.apache.org>>; LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock.
Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to trigger checkpoint and this would take some time under back pressure.

There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting buffers before barrier, you can check the config of  "taskmanager.network.memory.buffers-per-channel" and "taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to process source data faster, to avoid backpressure in some extent.

You could check which way is suitable for your scenario and may have a try.

Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Send Time:2019年2月28日(星期四) 17:28
To:user <us...@flink.apache.org>>
Subject:Checkpoints and catch-up burst (heavy back pressure)

Hello,

I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”.
Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a while and then restart it, we have a catch-up burst to get all the messages emitted in the meanwhile.
During this burst, the source is faster than the sink, and all checkpoints fail (time out) until the source has been totally caught up. This is annoying because the sink does not “commit” the data before a successful checkpoint is made, and so the app release all the “catch up” data as a atomic block that can be huge if the streaming app was stopped for a while, adding an unwanted stress to all the following hive treatments that use the data provided in micro batches and to the Hadoop cluster.

How should I handle the situation? Is there something special to do to get checkpoints even during heavy load?

The problem does not seem to be new, but I was unable to find any practical solution in the documentation.

Best regards,
Arnaud





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Re: Checkpoints and catch-up burst (heavy back pressure)

Posted by Ken Krugler <kk...@transpac.com>.
Hi Arnaud,

1. What’s your checkpoint configuration? Wondering if you’re writing to HDFS, and thus the load you’re putting on it while catching up & checkpointing is too high.

If so, then you could monitor the TotalLoad metric (FSNamesystem) in your source, and throttle back the emitting of file paths when this (empirically) gets too high.

2. I’m wondering what all you are checkpointing, and why.

E.g. if this is just an ETL-ish workflow to pull files, parse them, and write out (transformed) results, then you could in theory just checkpoint which files have been processed.

This means catching up after a failure could take more time, but your checkpoint size will be trivially small.

— Ken


> On Mar 1, 2019, at 5:04 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr> wrote:
> 
> Hi,
>  
> I think I should go into more details to explain my use case.
> I have one non parallel source (parallelism = 1) that list binary files in a HDFS directory. DataSet emitted by the source is a data set of file names, not file content. These filenames are rebalanced, and sent to workers (parallelism = 15) that will use a flatmapper that open the file, read it, decode it, and send records (forward mode) to the sinks (with a few 1-to-1 mapping in-between). So the flatmap operation is a time-consuming one as the files are more than 200Mb large each; the flatmapper will emit millions of record to the sink given one source record (filename).
>  
> The rebalancing, occurring at the file name level, does not use much I/O and I cannot use one-to-one mode at that point if I want some parallelims since I have only one source.
>  
> I did not put file decoding directly in the sources because I have no good way to distribute files to sources without a controller (input directory is unique, filenames are random and cannot be “attributed” to one particular source instance easily). 
> Alternatively, I could have used a dispatcher daemon separated from the streaming app that distribute files to various directories, each directory being associated with a flink source instance, and put the file reading & decoding directly in the source, but that seemed more complex to code and exploit than the filename source. Would it have been better from the checkpointing perspective?
>  
> About the ungraceful source sleep(), is there a way, programmatically, to know the “load” of the app, or to determine if checkpointing takes too much time, so that I can do it only on purpose?
>  
> Thanks,
> Arnaud
>  
> De : zhijiang <wangzhijiang999@aliyun.com <ma...@aliyun.com>> 
> Envoyé : vendredi 1 mars 2019 04:59
> À : user <user@flink.apache.org <ma...@flink.apache.org>>; LINZ, Arnaud <ALINZ@bouyguestelecom.fr <ma...@bouyguestelecom.fr>>
> Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
>  
> Hi Arnaud,
>  
> Thanks for the further feedbacks!
>  
> For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint expires because of timeout, the next new triggered checkpoint might still be failed for timeout. So it seems better to wait the current checkpoint until finishes, not expires it, unless we can not bear this long time for some reasons such as wondering failover to restore more data during this time.
>  
> For option2: The default network setting should be make sense. The lower values might cause performance regression and the higher values would increase the inflighing buffers and checkpoint delay more seriously.
>  
> For option3: If the resource is limited, it is still not working on your side.
>  
> It is an option and might work in your case for sleeping some time in source as you mentioned, although it seems not a graceful way.
>  
> I think there are no data skew in your case to cause backpressure, because you used the rebalance mode as mentioned. Another option might use the forward mode which would be better than rebalance mode if possible in your case. Because the source and downstream task is one-to-one in forward mode, so the total flighting buffers are 2+2+8 for one single downstream task before barrier. If in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of source vertex), because it is all-to-all connection. The barrier alignment takes more time in rebalance mode than forward mode.
>  
> Best,
> Zhijiang
> ------------------------------------------------------------------
> From:LINZ, Arnaud <ALINZ@bouyguestelecom.fr <ma...@bouyguestelecom.fr>>
> Send Time:2019年3月1日(星期五) 00:46
> To:zhijiang <wangzhijiang999@aliyun.com <ma...@aliyun.com>>; user <user@flink.apache.org <ma...@flink.apache.org>>
> Subject:RE: Checkpoints and catch-up burst (heavy back pressure)
>  
> Update :
> Option  1 does not work. It still fails at the end of the timeout, no matter its value.
> Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ?
>  
> De : LINZ, Arnaud 
> Envoyé : jeudi 28 février 2019 15:47
> À : 'zhijiang' <wangzhijiang999@aliyun.com <ma...@aliyun.com>>; user <user@flink.apache.org <ma...@flink.apache.org>>
> Objet : RE: Checkpoints and catch-up burst (heavy back pressure)
>  
> Hi Zhihiang,
>  
> Thanks for your feedback.
> I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large.
> Option 3 won’t work ; I already take too many ressources, and as my source is more or less a hdfs directory listing, it will always be far faster than any mapper that reads the file and emits records based on its content or sink that store the transformed data, unless I put “sleeps” in it (but is this really a good idea?)
> Option 2: taskmanager.network.memory.buffers-per-channel and taskmanager.network.memory.buffers-per-gate are currently unset in my configuration (so to their default of 2 and 8), but for this streaming app I have very few exchanges between nodes (just a rebalance after the source that emit file names, everything else is local to the node). Should I adjust their values nonetheless ? To higher or lower values ?
> Best,
> Arnaud
> De : zhijiang <wangzhijiang999@aliyun.com <ma...@aliyun.com>> 
> Envoyé : jeudi 28 février 2019 10:58
> À : user <user@flink.apache.org <ma...@flink.apache.org>>; LINZ, Arnaud <ALINZ@bouyguestelecom.fr <ma...@bouyguestelecom.fr>>
> Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
>  
> Hi Arnaud,
>  
> I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock.
> Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to trigger checkpoint and this would take some time under back pressure.
>  
> There has three ways to work around:
> 1. Increase the checkpoint timeout avoid expire in short time.
> 2. Decrease the setting of network buffers to decrease the amount of flighting buffers before barrier, you can check the config of  "taskmanager.network.memory.buffers-per-channel" and "taskmanager.network.memory.buffers-per-gate".
> 3. Adjust the parallelism such as increasing it for sink vertex in order to process source data faster, to avoid backpressure in some extent.
>  
> You could check which way is suitable for your scenario and may have a try.
>  
> Best,
> Zhijiang
> ------------------------------------------------------------------
> From:LINZ, Arnaud <ALINZ@bouyguestelecom.fr <ma...@bouyguestelecom.fr>>
> Send Time:2019年2月28日(星期四) 17:28
> To:user <user@flink.apache.org <ma...@flink.apache.org>>
> Subject:Checkpoints and catch-up burst (heavy back pressure)
>  
> Hello,
>  
> I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”.
> Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a while and then restart it, we have a catch-up burst to get all the messages emitted in the meanwhile.
> During this burst, the source is faster than the sink, and all checkpoints fail (time out) until the source has been totally caught up. This is annoying because the sink does not “commit” the data before a successful checkpoint is made, and so the app release all the “catch up” data as a atomic block that can be huge if the streaming app was stopped for a while, adding an unwanted stress to all the following hive treatments that use the data provided in micro batches and to the Hadoop cluster.
>  
> How should I handle the situation? Is there something special to do to get checkpoints even during heavy load?
>  
> The problem does not seem to be new, but I was unable to find any practical solution in the documentation.
>  
> Best regards,
> Arnaud
>  
>  
>  
>  
>  
> 
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
> 
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


RE: Checkpoints and catch-up burst (heavy back pressure)

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,
I like the idea, will give it a try.
Thanks,
Arnaud

De : Stephen Connolly <st...@gmail.com>
Envoyé : mardi 5 mars 2019 13:55
À : LINZ, Arnaud <AL...@bouyguestelecom.fr>
Cc : zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)



On Tue, 5 Mar 2019 at 12:48, Stephen Connolly <st...@gmail.com>> wrote:


On Fri, 1 Mar 2019 at 13:05, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
Hi,

I think I should go into more details to explain my use case.
I have one non parallel source (parallelism = 1) that list binary files in a HDFS directory. DataSet emitted by the source is a data set of file names, not file content. These filenames are rebalanced, and sent to workers (parallelism = 15) that will use a flatmapper that open the file, read it, decode it, and send records (forward mode) to the sinks (with a few 1-to-1 mapping in-between). So the flatmap operation is a time-consuming one as the files are more than 200Mb large each; the flatmapper will emit millions of record to the sink given one source record (filename).

The rebalancing, occurring at the file name level, does not use much I/O and I cannot use one-to-one mode at that point if I want some parallelims since I have only one source.

I did not put file decoding directly in the sources because I have no good way to distribute files to sources without a controller (input directory is unique, filenames are random and cannot be “attributed” to one particular source instance easily).

Crazy idea: If you know the task number and the number of tasks, you can hash the filename using a shared algorithm (e.g. md5 or sha1 or crc32) and then just check modulo number of tasks == task number.

That would let you run the list files in parallel without sharing state. which would allow file decoding directly in the sources

if you extend RichParallelSourceFunction you will have:

        int index = getRuntimeContext().getIndexOfThisSubtask();
        int count = getRuntimeContext().getNumberOfParallelSubtasks();

then a hash function like:

    private static int hash(String string) {
        int result = 0;
        for (byte b : DigestUtils.sha1(string)) {
            result = result * 31 + b;
        }
        return result;
    }

and just compare the filename like so:

for (String filename: listFiles()) {
  if (Math.floorMod(hash(filename), count) != index) {
    continue;
  }
  // this is our file
  ...
}

Note: if you know the file name patterns, you should tune the hash function to distribute them evenly. The SHA1 with prime reduction of the bytes is ok for general levelling... but may be poor over 15 buckets with your typical data set of filenames


Alternatively, I could have used a dispatcher daemon separated from the streaming app that distribute files to various directories, each directory being associated with a flink source instance, and put the file reading & decoding directly in the source, but that seemed more complex to code and exploit than the filename source. Would it have been better from the checkpointing perspective?

About the ungraceful source sleep(), is there a way, programmatically, to know the “load” of the app, or to determine if checkpointing takes too much time, so that I can do it only on purpose?

Thanks,
Arnaud

De : zhijiang <wa...@aliyun.com>>
Envoyé : vendredi 1 mars 2019 04:59
À : user <us...@flink.apache.org>>; LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

Thanks for the further feedbacks!

For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint expires because of timeout, the next new triggered checkpoint might still be failed for timeout. So it seems better to wait the current checkpoint until finishes, not expires it, unless we can not bear this long time for some reasons such as wondering failover to restore more data during this time.

For option2: The default network setting should be make sense. The lower values might cause performance regression and the higher values would increase the inflighing buffers and checkpoint delay more seriously.

For option3: If the resource is limited, it is still not working on your side.

It is an option and might work in your case for sleeping some time in source as you mentioned, although it seems not a graceful way.

I think there are no data skew in your case to cause backpressure, because you used the rebalance mode as mentioned. Another option might use the forward mode which would be better than rebalance mode if possible in your case. Because the source and downstream task is one-to-one in forward mode, so the total flighting buffers are 2+2+8 for one single downstream task before barrier. If in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of source vertex), because it is all-to-all connection. The barrier alignment takes more time in rebalance mode than forward mode.

Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Send Time:2019年3月1日(星期五) 00:46
To:zhijiang <wa...@aliyun.com>>; user <us...@flink.apache.org>>
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)

Update :
Option  1 does not work. It still fails at the end of the timeout, no matter its value.
Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ?

De : LINZ, Arnaud
Envoyé : jeudi 28 février 2019 15:47
À : 'zhijiang' <wa...@aliyun.com>>; user <us...@flink.apache.org>>
Objet : RE: Checkpoints and catch-up burst (heavy back pressure)

Hi Zhihiang,

Thanks for your feedback.

  *   I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large.
  *   Option 3 won’t work ; I already take too many ressources, and as my source is more or less a hdfs directory listing, it will always be far faster than any mapper that reads the file and emits records based on its content or sink that store the transformed data, unless I put “sleeps” in it (but is this really a good idea?)
  *   Option 2: taskmanager.network.memory.buffers-per-channel and taskmanager.network.memory.buffers-per-gate are currently unset in my configuration (so to their default of 2 and 8), but for this streaming app I have very few exchanges between nodes (just a rebalance after the source that emit file names, everything else is local to the node). Should I adjust their values nonetheless ? To higher or lower values ?
Best,
Arnaud
De : zhijiang <wa...@aliyun.com>>
Envoyé : jeudi 28 février 2019 10:58
À : user <us...@flink.apache.org>>; LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock.
Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to trigger checkpoint and this would take some time under back pressure.

There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting buffers before barrier, you can check the config of  "taskmanager.network.memory.buffers-per-channel" and "taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to process source data faster, to avoid backpressure in some extent.

You could check which way is suitable for your scenario and may have a try.

Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Send Time:2019年2月28日(星期四) 17:28
To:user <us...@flink.apache.org>>
Subject:Checkpoints and catch-up burst (heavy back pressure)

Hello,

I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”.
Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a while and then restart it, we have a catch-up burst to get all the messages emitted in the meanwhile.
During this burst, the source is faster than the sink, and all checkpoints fail (time out) until the source has been totally caught up. This is annoying because the sink does not “commit” the data before a successful checkpoint is made, and so the app release all the “catch up” data as a atomic block that can be huge if the streaming app was stopped for a while, adding an unwanted stress to all the following hive treatments that use the data provided in micro batches and to the Hadoop cluster.

How should I handle the situation? Is there something special to do to get checkpoints even during heavy load?

The problem does not seem to be new, but I was unable to find any practical solution in the documentation.

Best regards,
Arnaud





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.



Re: Checkpoints and catch-up burst (heavy back pressure)

Posted by Stephen Connolly <st...@gmail.com>.
On Tue, 5 Mar 2019 at 12:48, Stephen Connolly <
stephen.alan.connolly@gmail.com> wrote:

>
>
> On Fri, 1 Mar 2019 at 13:05, LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
>
>> Hi,
>>
>>
>>
>> I think I should go into more details to explain my use case.
>>
>> I have one non parallel source (parallelism = 1) that list binary files
>> in a HDFS directory. DataSet emitted by the source is a data set of file
>> names, not file content. These filenames are rebalanced, and sent to
>> workers (parallelism = 15) that will use a flatmapper that open the file,
>> read it, decode it, and send records (forward mode) to the sinks (with a
>> few 1-to-1 mapping in-between). So the flatmap operation is a
>> time-consuming one as the files are more than 200Mb large each; the
>> flatmapper will emit millions of record to the sink given one source record
>> (filename).
>>
>>
>>
>> The rebalancing, occurring at the file name level, does not use much I/O
>> and I cannot use one-to-one mode at that point if I want some parallelims
>> since I have only one source.
>>
>>
>>
>> I did not put file decoding directly in the sources because I have no
>> good way to distribute files to sources without a controller (input
>> directory is unique, filenames are random and cannot be “attributed” to one
>> particular source instance easily).
>>
>
> Crazy idea: If you know the task number and the number of tasks, you can
> hash the filename using a shared algorithm (e.g. md5 or sha1 or crc32) and
> then just check modulo number of tasks == task number.
>
> That would let you run the list files in parallel without sharing state.
> which would allow file decoding directly in the sources
>

if you extend RichParallelSourceFunction you will have:

        int index = getRuntimeContext().getIndexOfThisSubtask();
        int count = getRuntimeContext().getNumberOfParallelSubtasks();

then a hash function like:

    private static int hash(String string) {
        int result = 0;
        for (byte b : DigestUtils.sha1(string)) {
            result = result * 31 + b;
        }
        return result;
    }

and just compare the filename like so:

for (String filename: listFiles()) {
  if (Math.floorMod(hash(filename), count) != index) {
    continue;
  }
  // this is our file
  ...
}

Note: if you know the file name patterns, you should tune the hash function
to distribute them evenly. The SHA1 with prime reduction of the bytes is ok
for general levelling... but may be poor over 15 buckets with your typical
data set of filenames


>
>
>> Alternatively, I could have used a dispatcher daemon separated from the
>> streaming app that distribute files to various directories, each directory
>> being associated with a flink source instance, and put the file reading &
>> decoding directly in the source, but that seemed more complex to code and
>> exploit than the filename source. Would it have been better from the
>> checkpointing perspective?
>>
>>
>>
>> About the ungraceful source sleep(), is there a way, programmatically, to
>> know the “load” of the app, or to determine if checkpointing takes too much
>> time, so that I can do it only on purpose?
>>
>>
>>
>> Thanks,
>>
>> Arnaud
>>
>>
>>
>> *De :* zhijiang <wa...@aliyun.com>
>> *Envoyé :* vendredi 1 mars 2019 04:59
>> *À :* user <us...@flink.apache.org>; LINZ, Arnaud <
>> ALINZ@bouyguestelecom.fr>
>> *Objet :* Re: Checkpoints and catch-up burst (heavy back pressure)
>>
>>
>>
>> Hi Arnaud,
>>
>>
>>
>> Thanks for the further feedbacks!
>>
>>
>>
>> For option1: 40min still does not makes sense, which indicates it might
>> take more time to finish checkpoint in your case. I also experienced some
>> scenarios of catching up data to take several hours to finish one
>> checkpoint. If the current checkpoint expires because of timeout, the next
>> new triggered checkpoint might still be failed for timeout. So it seems
>> better to wait the current checkpoint until finishes, not expires it,
>> unless we can not bear this long time for some reasons such as wondering
>> failover to restore more data during this time.
>>
>>
>>
>> For option2: The default network setting should be make sense. The lower
>> values might cause performance regression and the higher values would
>> increase the inflighing buffers and checkpoint delay more seriously.
>>
>>
>>
>> For option3: If the resource is limited, it is still not working on your
>> side.
>>
>>
>>
>> It is an option and might work in your case for sleeping some time in
>> source as you mentioned, although it seems not a graceful way.
>>
>>
>>
>> I think there are no data skew in your case to cause backpressure,
>> because you used the rebalance mode as mentioned. Another option might use
>> the forward mode which would be better than rebalance mode if possible in
>> your case. Because the source and downstream task is one-to-one in forward
>> mode, so the total flighting buffers are 2+2+8 for one single downstream
>> task before barrier. If in rebalance mode, the total flighting buffer would
>> be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of
>> source vertex), because it is all-to-all connection. The barrier alignment
>> takes more time in rebalance mode than forward mode.
>>
>>
>>
>> Best,
>>
>> Zhijiang
>>
>> ------------------------------------------------------------------
>>
>> From:LINZ, Arnaud <AL...@bouyguestelecom.fr>
>>
>> Send Time:2019年3月1日(星期五) 00:46
>>
>> To:zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>
>>
>> Subject:RE: Checkpoints and catch-up burst (heavy back pressure)
>>
>>
>>
>> Update :
>>
>> Option  1 does not work. It still fails at the end of the timeout, no
>> matter its value.
>>
>> Should I implement a “bandwidth” management system by using artificial
>> Thread.sleep in the source depending on the back pressure ?
>>
>>
>>
>> *De :* LINZ, Arnaud
>> *Envoyé :* jeudi 28 février 2019 15:47
>> *À :* 'zhijiang' <wa...@aliyun.com>; user <
>> user@flink.apache.org>
>> *Objet :* RE: Checkpoints and catch-up burst (heavy back pressure)
>>
>>
>>
>> Hi Zhihiang,
>>
>>
>>
>> Thanks for your feedback.
>>
>>    - I’ll try option 1 ; time out is 4min for now, I’ll switch it to
>>    40min and will let you know. Setting it higher than 40 min does not make
>>    much sense since after 40 min the pending output is already quite large.
>>    - Option 3 won’t work ; I already take too many ressources, and as my
>>    source is more or less a hdfs directory listing, it will always be far
>>    faster than any mapper that reads the file and emits records based on its
>>    content or sink that store the transformed data, unless I put “sleeps” in
>>    it (but is this really a good idea?)
>>    - Option 2: taskmanager.network.memory.buffers-per-channel and
>>    taskmanager.network.memory.buffers-per-gate are currently unset in my
>>    configuration (so to their default of 2 and 8), but for this streaming app
>>    I have very few exchanges between nodes (just a rebalance after the source
>>    that emit file names, everything else is local to the node). Should I
>>    adjust their values nonetheless ? To higher or lower values ?
>>
>> Best,
>>
>> Arnaud
>>
>> *De :* zhijiang <wa...@aliyun.com>
>> *Envoyé :* jeudi 28 février 2019 10:58
>> *À :* user <us...@flink.apache.org>; LINZ, Arnaud <
>> ALINZ@bouyguestelecom.fr>
>> *Objet :* Re: Checkpoints and catch-up burst (heavy back pressure)
>>
>>
>>
>> Hi Arnaud,
>>
>>
>>
>> I think there are two key points. First the checkpoint barrier might be
>> emitted delay from source under high backpressure for synchronizing lock.
>>
>> Second the barrier has to be queued in flighting data buffers, so the
>> downstream task has to process all the buffers before barriers to trigger
>> checkpoint and this would take some time under back pressure.
>>
>>
>>
>> There has three ways to work around:
>>
>> 1. Increase the checkpoint timeout avoid expire in short time.
>>
>> 2. Decrease the setting of network buffers to decrease the amount of
>> flighting buffers before barrier, you can check the config of
>>  "taskmanager.network.memory.buffers-per-channel" and
>> "taskmanager.network.memory.buffers-per-gate".
>>
>> 3. Adjust the parallelism such as increasing it for sink vertex in order
>> to process source data faster, to avoid backpressure in some extent.
>>
>>
>>
>> You could check which way is suitable for your scenario and may have a
>> try.
>>
>>
>>
>> Best,
>>
>> Zhijiang
>>
>> ------------------------------------------------------------------
>>
>> From:LINZ, Arnaud <AL...@bouyguestelecom.fr>
>>
>> Send Time:2019年2月28日(星期四) 17:28
>>
>> To:user <us...@flink.apache.org>
>>
>> Subject:Checkpoints and catch-up burst (heavy back pressure)
>>
>>
>>
>> Hello,
>>
>>
>>
>> I have a simple streaming app that get data from a source and store it to
>> HDFS using a sink similar to the bucketing file sink. Checkpointing mode is
>> “exactly once”.
>>
>> Everything is fine on a “normal” course as the sink is faster than the
>> source; but when we stop the application for a while and then restart it,
>> we have a catch-up burst to get all the messages emitted in the meanwhile.
>>
>> During this burst, the source is faster than the sink, and all
>> checkpoints fail (time out) until the source has been totally caught up.
>> This is annoying because the sink does not “commit” the data before a
>> successful checkpoint is made, and so the app release all the “catch up”
>> data as a atomic block that can be huge if the streaming app was stopped
>> for a while, adding an unwanted stress to all the following hive treatments
>> that use the data provided in micro batches and to the Hadoop cluster.
>>
>>
>>
>> How should I handle the situation? Is there something special to do to
>> get checkpoints even during heavy load?
>>
>>
>>
>> The problem does not seem to be new, but I was unable to find any
>> practical solution in the documentation.
>>
>>
>>
>> Best regards,
>>
>> Arnaud
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> ------------------------------
>>
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>>
>>
>>
>>
>

Re: Checkpoints and catch-up burst (heavy back pressure)

Posted by Stephen Connolly <st...@gmail.com>.
On Fri, 1 Mar 2019 at 13:05, LINZ, Arnaud <AL...@bouyguestelecom.fr> wrote:

> Hi,
>
>
>
> I think I should go into more details to explain my use case.
>
> I have one non parallel source (parallelism = 1) that list binary files in
> a HDFS directory. DataSet emitted by the source is a data set of file
> names, not file content. These filenames are rebalanced, and sent to
> workers (parallelism = 15) that will use a flatmapper that open the file,
> read it, decode it, and send records (forward mode) to the sinks (with a
> few 1-to-1 mapping in-between). So the flatmap operation is a
> time-consuming one as the files are more than 200Mb large each; the
> flatmapper will emit millions of record to the sink given one source record
> (filename).
>
>
>
> The rebalancing, occurring at the file name level, does not use much I/O
> and I cannot use one-to-one mode at that point if I want some parallelims
> since I have only one source.
>
>
>
> I did not put file decoding directly in the sources because I have no good
> way to distribute files to sources without a controller (input directory is
> unique, filenames are random and cannot be “attributed” to one particular
> source instance easily).
>

Crazy idea: If you know the task number and the number of tasks, you can
hash the filename using a shared algorithm (e.g. md5 or sha1 or crc32) and
then just check modulo number of tasks == task number.

That would let you run the list files in parallel without sharing state.
which would allow file decoding directly in the sources


> Alternatively, I could have used a dispatcher daemon separated from the
> streaming app that distribute files to various directories, each directory
> being associated with a flink source instance, and put the file reading &
> decoding directly in the source, but that seemed more complex to code and
> exploit than the filename source. Would it have been better from the
> checkpointing perspective?
>
>
>
> About the ungraceful source sleep(), is there a way, programmatically, to
> know the “load” of the app, or to determine if checkpointing takes too much
> time, so that I can do it only on purpose?
>
>
>
> Thanks,
>
> Arnaud
>
>
>
> *De :* zhijiang <wa...@aliyun.com>
> *Envoyé :* vendredi 1 mars 2019 04:59
> *À :* user <us...@flink.apache.org>; LINZ, Arnaud <ALINZ@bouyguestelecom.fr
> >
> *Objet :* Re: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Hi Arnaud,
>
>
>
> Thanks for the further feedbacks!
>
>
>
> For option1: 40min still does not makes sense, which indicates it might
> take more time to finish checkpoint in your case. I also experienced some
> scenarios of catching up data to take several hours to finish one
> checkpoint. If the current checkpoint expires because of timeout, the next
> new triggered checkpoint might still be failed for timeout. So it seems
> better to wait the current checkpoint until finishes, not expires it,
> unless we can not bear this long time for some reasons such as wondering
> failover to restore more data during this time.
>
>
>
> For option2: The default network setting should be make sense. The lower
> values might cause performance regression and the higher values would
> increase the inflighing buffers and checkpoint delay more seriously.
>
>
>
> For option3: If the resource is limited, it is still not working on your
> side.
>
>
>
> It is an option and might work in your case for sleeping some time in
> source as you mentioned, although it seems not a graceful way.
>
>
>
> I think there are no data skew in your case to cause backpressure, because
> you used the rebalance mode as mentioned. Another option might use the
> forward mode which would be better than rebalance mode if possible in your
> case. Because the source and downstream task is one-to-one in forward mode,
> so the total flighting buffers are 2+2+8 for one single downstream task
> before barrier. If in rebalance mode, the total flighting buffer would be
> (a*2+a*2+8) for one single downstream task (`a` is the parallelism of
> source vertex), because it is all-to-all connection. The barrier alignment
> takes more time in rebalance mode than forward mode.
>
>
>
> Best,
>
> Zhijiang
>
> ------------------------------------------------------------------
>
> From:LINZ, Arnaud <AL...@bouyguestelecom.fr>
>
> Send Time:2019年3月1日(星期五) 00:46
>
> To:zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>
>
> Subject:RE: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Update :
>
> Option  1 does not work. It still fails at the end of the timeout, no
> matter its value.
>
> Should I implement a “bandwidth” management system by using artificial
> Thread.sleep in the source depending on the back pressure ?
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* jeudi 28 février 2019 15:47
> *À :* 'zhijiang' <wa...@aliyun.com>; user <user@flink.apache.org
> >
> *Objet :* RE: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Hi Zhihiang,
>
>
>
> Thanks for your feedback.
>
>    - I’ll try option 1 ; time out is 4min for now, I’ll switch it to
>    40min and will let you know. Setting it higher than 40 min does not make
>    much sense since after 40 min the pending output is already quite large.
>    - Option 3 won’t work ; I already take too many ressources, and as my
>    source is more or less a hdfs directory listing, it will always be far
>    faster than any mapper that reads the file and emits records based on its
>    content or sink that store the transformed data, unless I put “sleeps” in
>    it (but is this really a good idea?)
>    - Option 2: taskmanager.network.memory.buffers-per-channel and
>    taskmanager.network.memory.buffers-per-gate are currently unset in my
>    configuration (so to their default of 2 and 8), but for this streaming app
>    I have very few exchanges between nodes (just a rebalance after the source
>    that emit file names, everything else is local to the node). Should I
>    adjust their values nonetheless ? To higher or lower values ?
>
> Best,
>
> Arnaud
>
> *De :* zhijiang <wa...@aliyun.com>
> *Envoyé :* jeudi 28 février 2019 10:58
> *À :* user <us...@flink.apache.org>; LINZ, Arnaud <ALINZ@bouyguestelecom.fr
> >
> *Objet :* Re: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Hi Arnaud,
>
>
>
> I think there are two key points. First the checkpoint barrier might be
> emitted delay from source under high backpressure for synchronizing lock.
>
> Second the barrier has to be queued in flighting data buffers, so the
> downstream task has to process all the buffers before barriers to trigger
> checkpoint and this would take some time under back pressure.
>
>
>
> There has three ways to work around:
>
> 1. Increase the checkpoint timeout avoid expire in short time.
>
> 2. Decrease the setting of network buffers to decrease the amount of
> flighting buffers before barrier, you can check the config of
>  "taskmanager.network.memory.buffers-per-channel" and
> "taskmanager.network.memory.buffers-per-gate".
>
> 3. Adjust the parallelism such as increasing it for sink vertex in order
> to process source data faster, to avoid backpressure in some extent.
>
>
>
> You could check which way is suitable for your scenario and may have a try.
>
>
>
> Best,
>
> Zhijiang
>
> ------------------------------------------------------------------
>
> From:LINZ, Arnaud <AL...@bouyguestelecom.fr>
>
> Send Time:2019年2月28日(星期四) 17:28
>
> To:user <us...@flink.apache.org>
>
> Subject:Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Hello,
>
>
>
> I have a simple streaming app that get data from a source and store it to
> HDFS using a sink similar to the bucketing file sink. Checkpointing mode is
> “exactly once”.
>
> Everything is fine on a “normal” course as the sink is faster than the
> source; but when we stop the application for a while and then restart it,
> we have a catch-up burst to get all the messages emitted in the meanwhile.
>
> During this burst, the source is faster than the sink, and all checkpoints
> fail (time out) until the source has been totally caught up. This is
> annoying because the sink does not “commit” the data before a successful
> checkpoint is made, and so the app release all the “catch up” data as a
> atomic block that can be huge if the streaming app was stopped for a while,
> adding an unwanted stress to all the following hive treatments that use the
> data provided in micro batches and to the Hadoop cluster.
>
>
>
> How should I handle the situation? Is there something special to do to get
> checkpoints even during heavy load?
>
>
>
> The problem does not seem to be new, but I was unable to find any
> practical solution in the documentation.
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
>
>
>
>
>
> ------------------------------
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>
>
>
>
>

Re: Checkpoints and catch-up burst (heavy back pressure)

Posted by zhijiang <wa...@aliyun.com>.
Hi Arnaud,

I think I understand your special user case based on your further explanation. As you said, it is easy for source to emit the whole file names caching in network buffers because the emitted file name is so small and flatmap/sink processing is slow. Then when checkpoint triggered, the barrier is behind the whole set of file names, that means the sink can not receive the barrier until reading and writing all the corresponding files. 

So the proper solution in your case has to control the emit rate on source side based on sink catchup progress in order to avoid many files queued in front of barriers.  This is the right way to try and wish your solution with 2 parameters work.

Best,
Zhijiang


------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>
Send Time:2019年3月2日(星期六) 16:45
To:zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)

 Hello,
 When I think about it, I figure out that a barrier for the source is the whole set of files and therefore the checkpoint will never complete until the sink have caught up.
 The simplest way to deal with it without refactoring is to add 2 parameters to the source, a file number  threshold detecting the catchup mode and a max file per sec limitation when this occupe, slightly lower than the natural catchup rate.

 -------- Message d'origine --------
 De : "LINZ, Arnaud" <AL...@bouyguestelecom.fr>
 Date : ven., mars 01, 2019 2:04 PM +0100
 A : zhijiang <wa...@aliyun.com>, user <us...@flink.apache.org>
 Objet : RE: Checkpoints and catch-up burst (heavy back pressure)

Hi,
I think I should go into more details to explain my use case.
I have one non parallel source (parallelism = 1) that list binary files in a HDFS directory. DataSet emitted by the source is a data set of file names, not file content. These filenames are rebalanced, and sent to workers (parallelism = 15) that will use a flatmapper that open the file, read it, decode it, and send records (forward mode) to the sinks (with a few 1-to-1 mapping in-between). So the flatmap operation is a time-consuming one as the files are more than 200Mb large each; the flatmapper will emit millions of record to the sink given one source record (filename).
The rebalancing, occurring at the file name level, does not use much I/O and I cannot use one-to-one mode at that point if I want some parallelims since I have only one source.
I did not put file decoding directly in the sources because I have no good way to distribute files to sources without a controller (input directory is unique, filenames are random and cannot be “attributed” to one particular source instance easily). 
Alternatively, I could have used a dispatcher daemon separated from the streaming app that distribute files to various directories, each directory being associated with a flink source instance, and put the file reading & decoding directly in the source, but that seemed more complex to code and exploit than the filename source. Would it have been better from the checkpointing perspective?
About the ungraceful source sleep(), is there a way, programmatically, to know the “load” of the app, or to determine if checkpointing takes too much time, so that I can do it only on purpose?
Thanks,
Arnaud
De : zhijiang <wa...@aliyun.com> 
Envoyé : vendredi 1 mars 2019 04:59
À : user <us...@flink.apache.org>; LINZ, Arnaud <AL...@bouyguestelecom.fr>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud,
Thanks for the further feedbacks!
For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint expires because of timeout, the next new triggered checkpoint might still be failed for timeout. So it seems better to wait the current checkpoint until finishes, not expires it, unless we can not bear this long time for some reasons such as wondering failover to restore more data during this time.
For option2: The default network setting should be make sense. The lower values might cause performance regression and the higher values would increase the inflighing buffers and checkpoint delay more seriously.
For option3: If the resource is limited, it is still not working on your side.
It is an option and might work in your case for sleeping some time in source as you mentioned, although it seems not a graceful way.
I think there are no data skew in your case to cause backpressure, because you used the rebalance mode as mentioned. Another option might use the forward mode which would be better than rebalance mode if possible in your case. Because the source and downstream task is one-to-one in forward mode, so the total flighting buffers are 2+2+8 for one single downstream task before barrier. If in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of source vertex), because it is all-to-all connection. The barrier alignment takes more time in rebalance mode than forward mode.
Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>
Send Time:2019年3月1日(星期五) 00:46
To:zhijiang <wa...@aliyun.com>; user <us...@flink.apache.org>
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)
Update :
Option  1 does not work. It still fails at the end of the timeout, no matter its value.
Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ? 
De : LINZ, Arnaud 
Envoyé : jeudi 28 février 2019 15:47
À : 'zhijiang' <wa...@aliyun.com>; user <us...@flink.apache.org>
Objet : RE: Checkpoints and catch-up burst (heavy back pressure)
Hi Zhihiang,
Thanks for your feedback.
I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large.
Option 3 won’t work ; I already take too many ressources, and as my source is more or less a hdfs directory listing, it will always be far faster than any mapper that reads the file and emits records based on its content or sink that store the transformed data, unless I put “sleeps” in it (but is this really a good idea?)
Option 2:  taskmanager.network.memory.buffers-per-channel and taskmanager.network.memory.buffers-per-gate are currently unset in my configuration (so to their default of 2 and 8), but for this streaming app I have very few exchanges between nodes (just a rebalance after the source that emit file names, everything else is local to the node). Should I adjust their values nonetheless ? To higher or lower values ?
Best,
Arnaud
De : zhijiang <wa...@aliyun.com> 
Envoyé : jeudi 28 février 2019 10:58
À : user <us...@flink.apache.org>; LINZ, Arnaud <AL...@bouyguestelecom.fr>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud,
I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock. 
Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to trigger checkpoint and this would take some time under back pressure.
There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting buffers before barrier, you can check the config of  "taskmanager.network.memory.buffers-per-channel" and "taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to process source data faster, to avoid backpressure in some extent.
You could check which way is suitable for your scenario and may have a try.
Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>
Send Time:2019年2月28日(星期四) 17:28
To:user <us...@flink.apache.org>
Subject:Checkpoints and catch-up burst (heavy back pressure)
Hello,
I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”.
Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a while and then restart it, we have a catch-up burst to get all the messages emitted in the meanwhile.
During this burst, the source is faster than the sink, and all checkpoints fail (time out) until the source has been totally caught up. This is annoying because the sink does not “commit” the data before a successful checkpoint is made, and so the app release all the “catch up” data as a atomic block that can be huge if the streaming app was stopped for a while, adding an unwanted stress to all the following hive treatments that use the data provided in micro batches and to the Hadoop cluster.
How should I handle the situation? Is there something special to do to get checkpoints even during heavy load?
The problem does not seem to be new, but I was unable to find any practical solution in the documentation.
Best regards,
Arnaud


 L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

 The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.


RE: Checkpoints and catch-up burst (heavy back pressure)

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hello,
When I think about it, I figure out that a barrier for the source is the whole set of files and therefore the checkpoint will never complete until the sink have caught up.
The simplest way to deal with it without refactoring is to add 2 parameters to the source, a file number  threshold detecting the catchup mode and a max file per sec limitation when this occupe, slightly lower than the natural catchup rate.

-------- Message d'origine --------
De : "LINZ, Arnaud" <AL...@bouyguestelecom.fr>
Date : ven., mars 01, 2019 2:04 PM +0100
A : zhijiang <wa...@aliyun.com>, user <us...@flink.apache.org>
Objet : RE: Checkpoints and catch-up burst (heavy back pressure)

Hi,

I think I should go into more details to explain my use case.
I have one non parallel source (parallelism = 1) that list binary files in a HDFS directory. DataSet emitted by the source is a data set of file names, not file content. These filenames are rebalanced, and sent to workers (parallelism = 15) that will use a flatmapper that open the file, read it, decode it, and send records (forward mode) to the sinks (with a few 1-to-1 mapping in-between). So the flatmap operation is a time-consuming one as the files are more than 200Mb large each; the flatmapper will emit millions of record to the sink given one source record (filename).

The rebalancing, occurring at the file name level, does not use much I/O and I cannot use one-to-one mode at that point if I want some parallelims since I have only one source.

I did not put file decoding directly in the sources because I have no good way to distribute files to sources without a controller (input directory is unique, filenames are random and cannot be “attributed” to one particular source instance easily).
Alternatively, I could have used a dispatcher daemon separated from the streaming app that distribute files to various directories, each directory being associated with a flink source instance, and put the file reading & decoding directly in the source, but that seemed more complex to code and exploit than the filename source. Would it have been better from the checkpointing perspective?

About the ungraceful source sleep(), is there a way, programmatically, to know the “load” of the app, or to determine if checkpointing takes too much time, so that I can do it only on purpose?

Thanks,
Arnaud

De : zhijiang <wa...@aliyun.com>
Envoyé : vendredi 1 mars 2019 04:59
À : user <us...@flink.apache.org>; LINZ, Arnaud <AL...@bouyguestelecom.fr>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

Thanks for the further feedbacks!

For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint expires because of timeout, the next new triggered checkpoint might still be failed for timeout. So it seems better to wait the current checkpoint until finishes, not expires it, unless we can not bear this long time for some reasons such as wondering failover to restore more data during this time.

For option2: The default network setting should be make sense. The lower values might cause performance regression and the higher values would increase the inflighing buffers and checkpoint delay more seriously.

For option3: If the resource is limited, it is still not working on your side.

It is an option and might work in your case for sleeping some time in source as you mentioned, although it seems not a graceful way.

I think there are no data skew in your case to cause backpressure, because you used the rebalance mode as mentioned. Another option might use the forward mode which would be better than rebalance mode if possible in your case. Because the source and downstream task is one-to-one in forward mode, so the total flighting buffers are 2+2+8 for one single downstream task before barrier. If in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of source vertex), because it is all-to-all connection. The barrier alignment takes more time in rebalance mode than forward mode.

Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Send Time:2019年3月1日(星期五) 00:46
To:zhijiang <wa...@aliyun.com>>; user <us...@flink.apache.org>>
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)

Update :
Option  1 does not work. It still fails at the end of the timeout, no matter its value.
Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ?

De : LINZ, Arnaud
Envoyé : jeudi 28 février 2019 15:47
À : 'zhijiang' <wa...@aliyun.com>>; user <us...@flink.apache.org>>
Objet : RE: Checkpoints and catch-up burst (heavy back pressure)

Hi Zhihiang,

Thanks for your feedback.

  *   I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large.
  *   Option 3 won’t work ; I already take too many ressources, and as my source is more or less a hdfs directory listing, it will always be far faster than any mapper that reads the file and emits records based on its content or sink that store the transformed data, unless I put “sleeps” in it (but is this really a good idea?)
  *   Option 2: taskmanager.network.memory.buffers-per-channel and taskmanager.network.memory.buffers-per-gate are currently unset in my configuration (so to their default of 2 and 8), but for this streaming app I have very few exchanges between nodes (just a rebalance after the source that emit file names, everything else is local to the node). Should I adjust their values nonetheless ? To higher or lower values ?
Best,
Arnaud
De : zhijiang <wa...@aliyun.com>>
Envoyé : jeudi 28 février 2019 10:58
À : user <us...@flink.apache.org>>; LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock.
Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to trigger checkpoint and this would take some time under back pressure.

There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting buffers before barrier, you can check the config of  "taskmanager.network.memory.buffers-per-channel" and "taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to process source data faster, to avoid backpressure in some extent.

You could check which way is suitable for your scenario and may have a try.

Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Send Time:2019年2月28日(星期四) 17:28
To:user <us...@flink.apache.org>>
Subject:Checkpoints and catch-up burst (heavy back pressure)

Hello,

I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”.
Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a while and then restart it, we have a catch-up burst to get all the messages emitted in the meanwhile.
During this burst, the source is faster than the sink, and all checkpoints fail (time out) until the source has been totally caught up. This is annoying because the sink does not “commit” the data before a successful checkpoint is made, and so the app release all the “catch up” data as a atomic block that can be huge if the streaming app was stopped for a while, adding an unwanted stress to all the following hive treatments that use the data provided in micro batches and to the Hadoop cluster.

How should I handle the situation? Is there something special to do to get checkpoints even during heavy load?

The problem does not seem to be new, but I was unable to find any practical solution in the documentation.

Best regards,
Arnaud





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.



RE: Checkpoints and catch-up burst (heavy back pressure)

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,

I think I should go into more details to explain my use case.
I have one non parallel source (parallelism = 1) that list binary files in a HDFS directory. DataSet emitted by the source is a data set of file names, not file content. These filenames are rebalanced, and sent to workers (parallelism = 15) that will use a flatmapper that open the file, read it, decode it, and send records (forward mode) to the sinks (with a few 1-to-1 mapping in-between). So the flatmap operation is a time-consuming one as the files are more than 200Mb large each; the flatmapper will emit millions of record to the sink given one source record (filename).

The rebalancing, occurring at the file name level, does not use much I/O and I cannot use one-to-one mode at that point if I want some parallelims since I have only one source.

I did not put file decoding directly in the sources because I have no good way to distribute files to sources without a controller (input directory is unique, filenames are random and cannot be “attributed” to one particular source instance easily).
Alternatively, I could have used a dispatcher daemon separated from the streaming app that distribute files to various directories, each directory being associated with a flink source instance, and put the file reading & decoding directly in the source, but that seemed more complex to code and exploit than the filename source. Would it have been better from the checkpointing perspective?

About the ungraceful source sleep(), is there a way, programmatically, to know the “load” of the app, or to determine if checkpointing takes too much time, so that I can do it only on purpose?

Thanks,
Arnaud

De : zhijiang <wa...@aliyun.com>
Envoyé : vendredi 1 mars 2019 04:59
À : user <us...@flink.apache.org>; LINZ, Arnaud <AL...@bouyguestelecom.fr>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

Thanks for the further feedbacks!

For option1: 40min still does not makes sense, which indicates it might take more time to finish checkpoint in your case. I also experienced some scenarios of catching up data to take several hours to finish one checkpoint. If the current checkpoint expires because of timeout, the next new triggered checkpoint might still be failed for timeout. So it seems better to wait the current checkpoint until finishes, not expires it, unless we can not bear this long time for some reasons such as wondering failover to restore more data during this time.

For option2: The default network setting should be make sense. The lower values might cause performance regression and the higher values would increase the inflighing buffers and checkpoint delay more seriously.

For option3: If the resource is limited, it is still not working on your side.

It is an option and might work in your case for sleeping some time in source as you mentioned, although it seems not a graceful way.

I think there are no data skew in your case to cause backpressure, because you used the rebalance mode as mentioned. Another option might use the forward mode which would be better than rebalance mode if possible in your case. Because the source and downstream task is one-to-one in forward mode, so the total flighting buffers are 2+2+8 for one single downstream task before barrier. If in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one single downstream task (`a` is the parallelism of source vertex), because it is all-to-all connection. The barrier alignment takes more time in rebalance mode than forward mode.

Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Send Time:2019年3月1日(星期五) 00:46
To:zhijiang <wa...@aliyun.com>>; user <us...@flink.apache.org>>
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)

Update :
Option  1 does not work. It still fails at the end of the timeout, no matter its value.
Should I implement a “bandwidth” management system by using artificial Thread.sleep in the source depending on the back pressure ?

De : LINZ, Arnaud
Envoyé : jeudi 28 février 2019 15:47
À : 'zhijiang' <wa...@aliyun.com>>; user <us...@flink.apache.org>>
Objet : RE: Checkpoints and catch-up burst (heavy back pressure)

Hi Zhihiang,

Thanks for your feedback.

  *   I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will let you know. Setting it higher than 40 min does not make much sense since after 40 min the pending output is already quite large.
  *   Option 3 won’t work ; I already take too many ressources, and as my source is more or less a hdfs directory listing, it will always be far faster than any mapper that reads the file and emits records based on its content or sink that store the transformed data, unless I put “sleeps” in it (but is this really a good idea?)
  *   Option 2: taskmanager.network.memory.buffers-per-channel and taskmanager.network.memory.buffers-per-gate are currently unset in my configuration (so to their default of 2 and 8), but for this streaming app I have very few exchanges between nodes (just a rebalance after the source that emit file names, everything else is local to the node). Should I adjust their values nonetheless ? To higher or lower values ?
Best,
Arnaud
De : zhijiang <wa...@aliyun.com>>
Envoyé : jeudi 28 février 2019 10:58
À : user <us...@flink.apache.org>>; LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)

Hi Arnaud,

I think there are two key points. First the checkpoint barrier might be emitted delay from source under high backpressure for synchronizing lock.
Second the barrier has to be queued in flighting data buffers, so the downstream task has to process all the buffers before barriers to trigger checkpoint and this would take some time under back pressure.

There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting buffers before barrier, you can check the config of  "taskmanager.network.memory.buffers-per-channel" and "taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to process source data faster, to avoid backpressure in some extent.

You could check which way is suitable for your scenario and may have a try.

Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <AL...@bouyguestelecom.fr>>
Send Time:2019年2月28日(星期四) 17:28
To:user <us...@flink.apache.org>>
Subject:Checkpoints and catch-up burst (heavy back pressure)

Hello,

I have a simple streaming app that get data from a source and store it to HDFS using a sink similar to the bucketing file sink. Checkpointing mode is “exactly once”.
Everything is fine on a “normal” course as the sink is faster than the source; but when we stop the application for a while and then restart it, we have a catch-up burst to get all the messages emitted in the meanwhile.
During this burst, the source is faster than the sink, and all checkpoints fail (time out) until the source has been totally caught up. This is annoying because the sink does not “commit” the data before a successful checkpoint is made, and so the app release all the “catch up” data as a atomic block that can be huge if the streaming app was stopped for a while, adding an unwanted stress to all the following hive treatments that use the data provided in micro batches and to the Hadoop cluster.

How should I handle the situation? Is there something special to do to get checkpoints even during heavy load?

The problem does not seem to be new, but I was unable to find any practical solution in the documentation.

Best regards,
Arnaud





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.