You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Padarn Wilson <pa...@gmail.com> on 2019/03/02 07:19:52 UTC

Re: Setting source vs sink vs window parallelism with data increase

Hi all again - following up on this I think I've identified my problem as
being something else, but would appreciate if anyone can offer advice.

After running my stream from sometime, I see that my garbage collector for
old generation starts to take a very long time:
[image: Screen Shot 2019-03-02 at 3.01.57 PM.png]
here the* purple line is young generation time*, this is ever increasing,
but grows slowly, while the *blue is old generation*.
This in itself is not a problem, but as soon as the next checkpoint is
triggered after this happens you see the following:
[image: Screen Shot 2019-03-02 at 3.02.48 PM.png]
It looks like the checkpoint hits a cap, but this is only because the
checkpoints start to timeout and fail (these are the alignment time per
operator)

I do notice that my state is growing quite larger over time, but I don't
have a good understanding of what would cause this to happen with the JVM
old generation metric, which appears to be the leading metric before a
problem is noticed. Other metrics such as network buffers also show that at
the checkpoint time things start to go haywire and the situation never
recovers.

Thanks

On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pa...@gmail.com> wrote:

> Hi all,
>
> I'm trying to process many records, and I have an expensive operation I'm
> trying to optimize. Simplified it is something like:
>
> Data: (key1, count, time)
>
> Source -> Map(x -> (x, newKeyList(x.key1))
>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>             -> Keyby(_.key1).TublingWindow().apply..
>             -> Sink
>
> In the Map -> Flatmap, what is happening is that each key is mapping to a
> set of keys, and then this is set as the new key. This effectively increase
> the size of the stream by 16x
>
> What I am trying to figure out is how to set the parallelism of my
> operators. I see in some comments that people suggest your source, sink and
> aggregation should have different parallelism, but I'm not clear on exactly
> why, or what this means for CPU utilization.
> (see for example
> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
> )
>
> Also, it isn't clear to me the best way to handle this increase in data
> within the stream itself.
>
> Thanks
>

Re: Setting source vs sink vs window parallelism with data increase

Posted by Piotr Nowojski <pi...@ververica.com>.
No problem and it’s good to hear that you managed to solve the problem.

Piotrek 

> On 23 Mar 2019, at 12:49, Padarn Wilson <pa...@gmail.com> wrote:
> 
> Well.. it turned out I was registering millions of timers by accident, which was why garbage collection was blowing up. Oops. Thanks for your help again.
> 
> On Wed, Mar 6, 2019 at 9:44 PM Padarn Wilson <padarn@gmail.com <ma...@gmail.com>> wrote:
> Thanks a lot for your suggestion. I’ll dig into it and update for the mailing list if I find anything useful.
> 
> Padarn
> 
> On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>> wrote:
> Re-adding user mailing list.
> 
> 
> Hi,
> 
> If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s Mission Control) can lead you to the solution. Once you confirm that it’s a GC issue, there are numerous resources online how to analyse the cause of the problem. For that, it is difficult to use CPU profiling/Flink Metrics, since GC issues caused by one thread, can cause performance bottlenecks in other unrelated places.
> 
> If that’s not a GC issue, you can use Flink metrics (like number of buffered input/output data) to find Task that’s causing a bottleneck. Then you can use CPU profiler to analyse why is that happening.
> 
> Piotrek
> 
>> On 6 Mar 2019, at 02:52, Padarn Wilson <padarn@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Piotr,
>> 
>> Thanks for your feedback. Makes sense about the checkpoint barriers - this definitely could be the cause of a problem.
>> 
>> I would advice profiling your job to find out what’s going on.
>> 
>> Agreed. Outside of inspecting the Flink metrics, do you have suggestions for tools with which to do this?
>> 
>> The main thing I'm trying to pin down is:
>> 1) Is it the downstream processing from the expansion of records that causes a problem, or 
>> 2) Is is the shuffle of all the records after the expansion which is taking a large time - if so, is there anything I can do to mitigate this other than trying to ensure less shuffle.
>> 
>> Thanks,
>> Padarn
>> 
>> 
>> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>> wrote:
>> Hi,
>> 
>>> Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 
>> 
>> The problem is that when Flink starts checkpoint and inject checkpoint barriers, those checkpoint barriers travel through the Job Graph. The quicker they can do that the better. How fast does it take depends on the amount of buffered data before checkpoint barriers (currently all of such records must be processed before checkpoint barrier is passed down stream). The more buffered records and the more time it takes to process those records, the longer the checkpoint take time. Obviously if one stage in the job is multiplying the amount of records, it can in a way multiply the amount of “buffered work” that needs to be processed before checkpoint barriers pass through.
>> 
>> However it might not be the case for you. To analyse what’s going on you would need to look at various Flink metrics, like checkpoint times, back pressured tasks, state of the output/input buffers of the tasks, etc. However #2, those are secondary issues. First of all you should try to pin point the cause of long GC pauses. If it comes from your code, you should fix this first. If that either isn’t the issue or doesn’t solve it, generally speaking I would advice profiling your job to find out what’s going on.
>> 
>> Piotrek
>> 
>>> On 5 Mar 2019, at 02:00, Padarn Wilson <padarn@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).
>>> 
>>> Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 
>>> 
>>> Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 
>>> 
>>> However you might want to first identify what’s causing long GC times.
>>> 
>>> My current plan is to try and enable GC logs and see if I can get something meaningful from them. 
>>> 
>>> Thanks a lot,
>>> Padarn 
>>> 
>>> 
>>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>> wrote:
>>> Hi,
>>> 
>>> What Flink version are you using?
>>> 
>>> Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 
>>> 
>>> However you might want to first identify what’s causing long GC times. If there are long GC pause, this should be the first thing to fix.
>>> 
>>> Piotrek
>>> 
>>>> On 2 Mar 2019, at 08:19, Padarn Wilson <padarn@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi all again - following up on this I think I've identified my problem as being something else, but would appreciate if anyone can offer advice.
>>>> 
>>>> After running my stream from sometime, I see that my garbage collector for old generation starts to take a very long time:
>>>> <Screen Shot 2019-03-02 at 3.01.57 PM.png>
>>>> here the purple line is young generation time, this is ever increasing, but grows slowly, while the blue is old generation.
>>>> This in itself is not a problem, but as soon as the next checkpoint is triggered after this happens you see the following:
>>>> <Screen Shot 2019-03-02 at 3.02.48 PM.png>
>>>> It looks like the checkpoint hits a cap, but this is only because the checkpoints start to timeout and fail (these are the alignment time per operator)
>>>> 
>>>> I do notice that my state is growing quite larger over time, but I don't have a good understanding of what would cause this to happen with the JVM old generation metric, which appears to be the leading metric before a problem is noticed. Other metrics such as network buffers also show that at the checkpoint time things start to go haywire and the situation never recovers.
>>>> 
>>>> Thanks
>>>> 
>>>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <padarn@gmail.com <ma...@gmail.com>> wrote:
>>>> Hi all,
>>>> 
>>>> I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like:
>>>> 
>>>> Data: (key1, count, time)
>>>> 
>>>> Source -> Map(x -> (x, newKeyList(x.key1))
>>>>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>>>>             -> Keyby(_.key1).TublingWindow().apply..
>>>>             -> Sink
>>>> 
>>>> In the Map -> Flatmap, what is happening is that each key is mapping to a set of keys, and then this is set as the new key. This effectively increase the size of the stream by 16x
>>>> 
>>>> What I am trying to figure out is how to set the parallelism of my operators. I see in some comments that people suggest your source, sink and aggregation should have different parallelism, but I'm not clear on exactly why, or what this means for CPU utilization. 
>>>> (see for example https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly <https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly>)
>>>> 
>>>> Also, it isn't clear to me the best way to handle this increase in data within the stream itself.
>>>> 
>>>> Thanks
>>> 
>> 
> 


Re: Setting source vs sink vs window parallelism with data increase

Posted by Padarn Wilson <pa...@gmail.com>.
Well.. it turned out I was registering millions of timers by accident,
which was why garbage collection was blowing up. Oops. Thanks for your help
again.

On Wed, Mar 6, 2019 at 9:44 PM Padarn Wilson <pa...@gmail.com> wrote:

> Thanks a lot for your suggestion. I’ll dig into it and update for the
> mailing list if I find anything useful.
>
> Padarn
>
> On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <pi...@ververica.com> wrote:
>
>> Re-adding user mailing list.
>>
>>
>> Hi,
>>
>> If it is a GC issue, only GC logs or some JVM memory profilers (like
>> Oracle’s Mission Control) can lead you to the solution. Once you confirm
>> that it’s a GC issue, there are numerous resources online how to analyse
>> the cause of the problem. For that, it is difficult to use CPU
>> profiling/Flink Metrics, since GC issues caused by one thread, can cause
>> performance bottlenecks in other unrelated places.
>>
>> If that’s not a GC issue, you can use Flink metrics (like number of
>> buffered input/output data) to find Task that’s causing a bottleneck. Then
>> you can use CPU profiler to analyse why is that happening.
>>
>> Piotrek
>>
>> On 6 Mar 2019, at 02:52, Padarn Wilson <pa...@gmail.com> wrote:
>>
>> Hi Piotr,
>>
>> Thanks for your feedback. Makes sense about the checkpoint barriers -
>> this definitely could be the cause of a problem.
>>
>> I would advice profiling your job to find out what’s going on.
>>
>>
>> Agreed. Outside of inspecting the Flink metrics, do you have suggestions
>> for tools with which to do this?
>>
>> The main thing I'm trying to pin down is:
>> 1) Is it the downstream processing from the expansion of records that
>> causes a problem, or
>> 2) Is is the shuffle of all the records after the expansion which is
>> taking a large time - if so, is there anything I can do to mitigate this
>> other than trying to ensure less shuffle.
>>
>> Thanks,
>> Padarn
>>
>>
>> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <pi...@ververica.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Do you mind elaborating on this? What technology would you propose as an
>>> alternative, and why would this increase checkpointing time?
>>>
>>>
>>> The problem is that when Flink starts checkpoint and inject checkpoint
>>> barriers, those checkpoint barriers travel through the Job Graph. The
>>> quicker they can do that the better. How fast does it take depends on the
>>> amount of buffered data before checkpoint barriers (currently all of such
>>> records must be processed before checkpoint barrier is passed down stream).
>>> The more buffered records and the more time it takes to process those
>>> records, the longer the checkpoint take time. Obviously if one stage in the
>>> job is multiplying the amount of records, it can in a way multiply the
>>> amount of “buffered work” that needs to be processed before checkpoint
>>> barriers pass through.
>>>
>>> However it might not be the case for you. To analyse what’s going on you
>>> would need to look at various Flink metrics, like checkpoint times, back
>>> pressured tasks, state of the output/input buffers of the tasks, etc.
>>> However #2, those are secondary issues. First of all you should try to pin
>>> point the cause of long GC pauses. If it comes from your code, you should
>>> fix this first. If that either isn’t the issue or doesn’t solve it,
>>> generally speaking I would advice profiling your job to find out what’s
>>> going on.
>>>
>>> Piotrek
>>>
>>> On 5 Mar 2019, at 02:00, Padarn Wilson <pa...@gmail.com> wrote:
>>>
>>> Hi Piotr,
>>>
>>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to
>>> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).
>>>
>>> Generally speaking Flink might not the best if you have records fan out,
>>>> this may significantly increase checkpointing time.
>>>
>>>
>>> Do you mind elaborating on this? What technology would you propose as an
>>> alternative, and why would this increase checkpointing time?
>>>
>>> However you might want to first identify what’s causing long GC times.
>>>>
>>>
>>> My current plan is to try and enable GC logs and see if I can get
>>> something meaningful from them.
>>>
>>> Thanks a lot,
>>> Padarn
>>>
>>>
>>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <pi...@ververica.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> What Flink version are you using?
>>>>
>>>> Generally speaking Flink might not the best if you have records fan
>>>> out, this may significantly increase checkpointing time.
>>>>
>>>> However you might want to first identify what’s causing long GC times.
>>>> If there are long GC pause, this should be the first thing to fix.
>>>>
>>>> Piotrek
>>>>
>>>> On 2 Mar 2019, at 08:19, Padarn Wilson <pa...@gmail.com> wrote:
>>>>
>>>> Hi all again - following up on this I think I've identified my problem
>>>> as being something else, but would appreciate if anyone can offer advice.
>>>>
>>>> After running my stream from sometime, I see that my garbage collector
>>>> for old generation starts to take a very long time:
>>>> <Screen Shot 2019-03-02 at 3.01.57 PM.png>
>>>> here the* purple line is young generation time*, this is ever
>>>> increasing, but grows slowly, while the *blue is old generation*.
>>>> This in itself is not a problem, but as soon as the next checkpoint is
>>>> triggered after this happens you see the following:
>>>> <Screen Shot 2019-03-02 at 3.02.48 PM.png>
>>>> It looks like the checkpoint hits a cap, but this is only because the
>>>> checkpoints start to timeout and fail (these are the alignment time per
>>>> operator)
>>>>
>>>> I do notice that my state is growing quite larger over time, but I
>>>> don't have a good understanding of what would cause this to happen with the
>>>> JVM old generation metric, which appears to be the leading metric before a
>>>> problem is noticed. Other metrics such as network buffers also show that at
>>>> the checkpoint time things start to go haywire and the situation never
>>>> recovers.
>>>>
>>>> Thanks
>>>>
>>>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pa...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm trying to process many records, and I have an expensive operation
>>>>> I'm trying to optimize. Simplified it is something like:
>>>>>
>>>>> Data: (key1, count, time)
>>>>>
>>>>> Source -> Map(x -> (x, newKeyList(x.key1))
>>>>>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>>>>>             -> Keyby(_.key1).TublingWindow().apply..
>>>>>             -> Sink
>>>>>
>>>>> In the Map -> Flatmap, what is happening is that each key is mapping
>>>>> to a set of keys, and then this is set as the new key. This effectively
>>>>> increase the size of the stream by 16x
>>>>>
>>>>> What I am trying to figure out is how to set the parallelism of my
>>>>> operators. I see in some comments that people suggest your source, sink and
>>>>> aggregation should have different parallelism, but I'm not clear on exactly
>>>>> why, or what this means for CPU utilization.
>>>>> (see for example
>>>>> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
>>>>> )
>>>>>
>>>>> Also, it isn't clear to me the best way to handle this increase in
>>>>> data within the stream itself.
>>>>>
>>>>> Thanks
>>>>>
>>>>
>>>>
>>>
>>

Re: Setting source vs sink vs window parallelism with data increase

Posted by Padarn Wilson <pa...@gmail.com>.
Thanks a lot for your suggestion. I’ll dig into it and update for the
mailing list if I find anything useful.

Padarn

On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <pi...@ververica.com> wrote:

> Re-adding user mailing list.
>
>
> Hi,
>
> If it is a GC issue, only GC logs or some JVM memory profilers (like
> Oracle’s Mission Control) can lead you to the solution. Once you confirm
> that it’s a GC issue, there are numerous resources online how to analyse
> the cause of the problem. For that, it is difficult to use CPU
> profiling/Flink Metrics, since GC issues caused by one thread, can cause
> performance bottlenecks in other unrelated places.
>
> If that’s not a GC issue, you can use Flink metrics (like number of
> buffered input/output data) to find Task that’s causing a bottleneck. Then
> you can use CPU profiler to analyse why is that happening.
>
> Piotrek
>
> On 6 Mar 2019, at 02:52, Padarn Wilson <pa...@gmail.com> wrote:
>
> Hi Piotr,
>
> Thanks for your feedback. Makes sense about the checkpoint barriers - this
> definitely could be the cause of a problem.
>
> I would advice profiling your job to find out what’s going on.
>
>
> Agreed. Outside of inspecting the Flink metrics, do you have suggestions
> for tools with which to do this?
>
> The main thing I'm trying to pin down is:
> 1) Is it the downstream processing from the expansion of records that
> causes a problem, or
> 2) Is is the shuffle of all the records after the expansion which is
> taking a large time - if so, is there anything I can do to mitigate this
> other than trying to ensure less shuffle.
>
> Thanks,
> Padarn
>
>
> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <pi...@ververica.com> wrote:
>
>> Hi,
>>
>> Do you mind elaborating on this? What technology would you propose as an
>> alternative, and why would this increase checkpointing time?
>>
>>
>> The problem is that when Flink starts checkpoint and inject checkpoint
>> barriers, those checkpoint barriers travel through the Job Graph. The
>> quicker they can do that the better. How fast does it take depends on the
>> amount of buffered data before checkpoint barriers (currently all of such
>> records must be processed before checkpoint barrier is passed down stream).
>> The more buffered records and the more time it takes to process those
>> records, the longer the checkpoint take time. Obviously if one stage in the
>> job is multiplying the amount of records, it can in a way multiply the
>> amount of “buffered work” that needs to be processed before checkpoint
>> barriers pass through.
>>
>> However it might not be the case for you. To analyse what’s going on you
>> would need to look at various Flink metrics, like checkpoint times, back
>> pressured tasks, state of the output/input buffers of the tasks, etc.
>> However #2, those are secondary issues. First of all you should try to pin
>> point the cause of long GC pauses. If it comes from your code, you should
>> fix this first. If that either isn’t the issue or doesn’t solve it,
>> generally speaking I would advice profiling your job to find out what’s
>> going on.
>>
>> Piotrek
>>
>> On 5 Mar 2019, at 02:00, Padarn Wilson <pa...@gmail.com> wrote:
>>
>> Hi Piotr,
>>
>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to
>> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).
>>
>> Generally speaking Flink might not the best if you have records fan out,
>>> this may significantly increase checkpointing time.
>>
>>
>> Do you mind elaborating on this? What technology would you propose as an
>> alternative, and why would this increase checkpointing time?
>>
>> However you might want to first identify what’s causing long GC times.
>>>
>>
>> My current plan is to try and enable GC logs and see if I can get
>> something meaningful from them.
>>
>> Thanks a lot,
>> Padarn
>>
>>
>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <pi...@ververica.com>
>> wrote:
>>
>>> Hi,
>>>
>>> What Flink version are you using?
>>>
>>> Generally speaking Flink might not the best if you have records fan out,
>>> this may significantly increase checkpointing time.
>>>
>>> However you might want to first identify what’s causing long GC times.
>>> If there are long GC pause, this should be the first thing to fix.
>>>
>>> Piotrek
>>>
>>> On 2 Mar 2019, at 08:19, Padarn Wilson <pa...@gmail.com> wrote:
>>>
>>> Hi all again - following up on this I think I've identified my problem
>>> as being something else, but would appreciate if anyone can offer advice.
>>>
>>> After running my stream from sometime, I see that my garbage collector
>>> for old generation starts to take a very long time:
>>> <Screen Shot 2019-03-02 at 3.01.57 PM.png>
>>> here the* purple line is young generation time*, this is ever
>>> increasing, but grows slowly, while the *blue is old generation*.
>>> This in itself is not a problem, but as soon as the next checkpoint is
>>> triggered after this happens you see the following:
>>> <Screen Shot 2019-03-02 at 3.02.48 PM.png>
>>> It looks like the checkpoint hits a cap, but this is only because the
>>> checkpoints start to timeout and fail (these are the alignment time per
>>> operator)
>>>
>>> I do notice that my state is growing quite larger over time, but I don't
>>> have a good understanding of what would cause this to happen with the JVM
>>> old generation metric, which appears to be the leading metric before a
>>> problem is noticed. Other metrics such as network buffers also show that at
>>> the checkpoint time things start to go haywire and the situation never
>>> recovers.
>>>
>>> Thanks
>>>
>>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pa...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm trying to process many records, and I have an expensive operation
>>>> I'm trying to optimize. Simplified it is something like:
>>>>
>>>> Data: (key1, count, time)
>>>>
>>>> Source -> Map(x -> (x, newKeyList(x.key1))
>>>>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>>>>             -> Keyby(_.key1).TublingWindow().apply..
>>>>             -> Sink
>>>>
>>>> In the Map -> Flatmap, what is happening is that each key is mapping to
>>>> a set of keys, and then this is set as the new key. This effectively
>>>> increase the size of the stream by 16x
>>>>
>>>> What I am trying to figure out is how to set the parallelism of my
>>>> operators. I see in some comments that people suggest your source, sink and
>>>> aggregation should have different parallelism, but I'm not clear on exactly
>>>> why, or what this means for CPU utilization.
>>>> (see for example
>>>> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
>>>> )
>>>>
>>>> Also, it isn't clear to me the best way to handle this increase in data
>>>> within the stream itself.
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>
>

Re: Setting source vs sink vs window parallelism with data increase

Posted by Piotr Nowojski <pi...@ververica.com>.
Re-adding user mailing list.


Hi,

If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s Mission Control) can lead you to the solution. Once you confirm that it’s a GC issue, there are numerous resources online how to analyse the cause of the problem. For that, it is difficult to use CPU profiling/Flink Metrics, since GC issues caused by one thread, can cause performance bottlenecks in other unrelated places.

If that’s not a GC issue, you can use Flink metrics (like number of buffered input/output data) to find Task that’s causing a bottleneck. Then you can use CPU profiler to analyse why is that happening.

Piotrek

> On 6 Mar 2019, at 02:52, Padarn Wilson <pa...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Thanks for your feedback. Makes sense about the checkpoint barriers - this definitely could be the cause of a problem.
> 
> I would advice profiling your job to find out what’s going on.
> 
> Agreed. Outside of inspecting the Flink metrics, do you have suggestions for tools with which to do this?
> 
> The main thing I'm trying to pin down is:
> 1) Is it the downstream processing from the expansion of records that causes a problem, or 
> 2) Is is the shuffle of all the records after the expansion which is taking a large time - if so, is there anything I can do to mitigate this other than trying to ensure less shuffle.
> 
> Thanks,
> Padarn
> 
> 
> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>> wrote:
> Hi,
> 
>> Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 
> 
> The problem is that when Flink starts checkpoint and inject checkpoint barriers, those checkpoint barriers travel through the Job Graph. The quicker they can do that the better. How fast does it take depends on the amount of buffered data before checkpoint barriers (currently all of such records must be processed before checkpoint barrier is passed down stream). The more buffered records and the more time it takes to process those records, the longer the checkpoint take time. Obviously if one stage in the job is multiplying the amount of records, it can in a way multiply the amount of “buffered work” that needs to be processed before checkpoint barriers pass through.
> 
> However it might not be the case for you. To analyse what’s going on you would need to look at various Flink metrics, like checkpoint times, back pressured tasks, state of the output/input buffers of the tasks, etc. However #2, those are secondary issues. First of all you should try to pin point the cause of long GC pauses. If it comes from your code, you should fix this first. If that either isn’t the issue or doesn’t solve it, generally speaking I would advice profiling your job to find out what’s going on.
> 
> Piotrek
> 
>> On 5 Mar 2019, at 02:00, Padarn Wilson <padarn@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Piotr,
>> 
>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).
>> 
>> Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 
>> 
>> Do you mind elaborating on this? What technology would you propose as an alternative, and why would this increase checkpointing time? 
>> 
>> However you might want to first identify what’s causing long GC times.
>> 
>> My current plan is to try and enable GC logs and see if I can get something meaningful from them. 
>> 
>> Thanks a lot,
>> Padarn 
>> 
>> 
>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <piotr@ververica.com <ma...@ververica.com>> wrote:
>> Hi,
>> 
>> What Flink version are you using?
>> 
>> Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 
>> 
>> However you might want to first identify what’s causing long GC times. If there are long GC pause, this should be the first thing to fix.
>> 
>> Piotrek
>> 
>>> On 2 Mar 2019, at 08:19, Padarn Wilson <padarn@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi all again - following up on this I think I've identified my problem as being something else, but would appreciate if anyone can offer advice.
>>> 
>>> After running my stream from sometime, I see that my garbage collector for old generation starts to take a very long time:
>>> <Screen Shot 2019-03-02 at 3.01.57 PM.png>
>>> here the purple line is young generation time, this is ever increasing, but grows slowly, while the blue is old generation.
>>> This in itself is not a problem, but as soon as the next checkpoint is triggered after this happens you see the following:
>>> <Screen Shot 2019-03-02 at 3.02.48 PM.png>
>>> It looks like the checkpoint hits a cap, but this is only because the checkpoints start to timeout and fail (these are the alignment time per operator)
>>> 
>>> I do notice that my state is growing quite larger over time, but I don't have a good understanding of what would cause this to happen with the JVM old generation metric, which appears to be the leading metric before a problem is noticed. Other metrics such as network buffers also show that at the checkpoint time things start to go haywire and the situation never recovers.
>>> 
>>> Thanks
>>> 
>>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <padarn@gmail.com <ma...@gmail.com>> wrote:
>>> Hi all,
>>> 
>>> I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like:
>>> 
>>> Data: (key1, count, time)
>>> 
>>> Source -> Map(x -> (x, newKeyList(x.key1))
>>>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>>>             -> Keyby(_.key1).TublingWindow().apply..
>>>             -> Sink
>>> 
>>> In the Map -> Flatmap, what is happening is that each key is mapping to a set of keys, and then this is set as the new key. This effectively increase the size of the stream by 16x
>>> 
>>> What I am trying to figure out is how to set the parallelism of my operators. I see in some comments that people suggest your source, sink and aggregation should have different parallelism, but I'm not clear on exactly why, or what this means for CPU utilization. 
>>> (see for example https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly <https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly>)
>>> 
>>> Also, it isn't clear to me the best way to handle this increase in data within the stream itself.
>>> 
>>> Thanks
>> 
> 


Re: Setting source vs sink vs window parallelism with data increase

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi,

What Flink version are you using?

Generally speaking Flink might not the best if you have records fan out, this may significantly increase checkpointing time. 

However you might want to first identify what’s causing long GC times. If there are long GC pause, this should be the first thing to fix.

Piotrek

> On 2 Mar 2019, at 08:19, Padarn Wilson <pa...@gmail.com> wrote:
> 
> Hi all again - following up on this I think I've identified my problem as being something else, but would appreciate if anyone can offer advice.
> 
> After running my stream from sometime, I see that my garbage collector for old generation starts to take a very long time:
> <Screen Shot 2019-03-02 at 3.01.57 PM.png>
> here the purple line is young generation time, this is ever increasing, but grows slowly, while the blue is old generation.
> This in itself is not a problem, but as soon as the next checkpoint is triggered after this happens you see the following:
> <Screen Shot 2019-03-02 at 3.02.48 PM.png>
> It looks like the checkpoint hits a cap, but this is only because the checkpoints start to timeout and fail (these are the alignment time per operator)
> 
> I do notice that my state is growing quite larger over time, but I don't have a good understanding of what would cause this to happen with the JVM old generation metric, which appears to be the leading metric before a problem is noticed. Other metrics such as network buffers also show that at the checkpoint time things start to go haywire and the situation never recovers.
> 
> Thanks
> 
> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <padarn@gmail.com <ma...@gmail.com>> wrote:
> Hi all,
> 
> I'm trying to process many records, and I have an expensive operation I'm trying to optimize. Simplified it is something like:
> 
> Data: (key1, count, time)
> 
> Source -> Map(x -> (x, newKeyList(x.key1))
>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>             -> Keyby(_.key1).TublingWindow().apply..
>             -> Sink
> 
> In the Map -> Flatmap, what is happening is that each key is mapping to a set of keys, and then this is set as the new key. This effectively increase the size of the stream by 16x
> 
> What I am trying to figure out is how to set the parallelism of my operators. I see in some comments that people suggest your source, sink and aggregation should have different parallelism, but I'm not clear on exactly why, or what this means for CPU utilization. 
> (see for example https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly <https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly>)
> 
> Also, it isn't clear to me the best way to handle this increase in data within the stream itself.
> 
> Thanks