You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Darshan Singh <da...@gmail.com> on 2018/08/29 08:19:49 UTC

Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we
could face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for
batches I was seeing that it was just showing the tasks status and no
status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do
we reduce this especially if I am reading from hdfs.

Thanks

回复:Backpressure? for Batches

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com>.
You can check the log to show the related stack in OOM, maybe we can confirm some reasons.
Or you can dump the heap to analyze the memory usages after OOM.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <da...@gmail.com>
发送时间:2018年8月29日(星期三) 19:22
收件人:wangzhijiang999 <wa...@aliyun.com>
抄 送:chesnay <ch...@apache.org>; user <us...@flink.apache.org>
主 题:Re: Backpressure? for Batches

Thanks,

I thought either Group by is causing the OOM but it is mentioned that sort will be spilled to disk so that there is no way for that to cause the OOM. So I was looking maybe due to back pressure some of data read from hdfs is kept in memory as it is not consumed and that is causing OOM.So it seems this is not possible as well so need to relook what could be causing the OOM.

Thanks
On Wed, Aug 29, 2018 at 12:41 PM Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
chesnay is right. For batch job there are two ways for notifying consumable. One is the first record emitted by upstream and the other is the upstream finishes all the records (blocking mode).

For your case, the slow groupby node will trigger back-pressure and block the upstreams until source node. But it will not cause OOM for caching in-flight buffers normally because they are managed by framework and will not exceed unlimited, only if the netty buffer pool may cause that I experienced before.

One possible way to avoid backpressure is to increase the parallelism of slow node(groupby in your case) or decrease the parallelism of fast node(source in your case).

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <da...@gmail.com>
发送时间:2018年8月29日(星期三) 18:16
收件人:chesnay <ch...@apache.org>
抄 送:wangzhijiang999 <wa...@aliyun.com>; user <us...@flink.apache.org>
主 题:Re: Backpressure? for Batches

Thanks, Now back to my question again. How can I say read at less speed from hdfs than my say map or group by can consume? Is there some sort of configuration which says read only 10000 rows and then stop and then reread etc. Otherwise source will keep on sending the data or keeping in some sort of buffers and will be OOM. Or setting different parallelism while reading is the way to handle this?

Thanks
On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler <ch...@apache.org> wrote:
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled when there is data to be consumed, i.e. one the first record was emitted by the previous operator. As such back-pressure exists in batch just like in streaming.

 On 29.08.2018 11:39, Darshan Singh wrote:
Thanks, 

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of makes sense. From your answer I understood that flink will do first number 1 once that is completed it will do map(or grouping as well) and then grouping and finally the write. Thus, there should be 1 task running at 1 time. This doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the deserialise part or maybe reading from hdfs itself? 

Thanks  
On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
The backpressure is caused when downstream and upstream are running concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, that means the downstream will be scheduled after upstream finishes, so the slower downstream will not block upstream running, then the backpressure may not exist in this case.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <da...@gmail.com>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <us...@flink.apache.org>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well. 

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks  





Re: Backpressure? for Batches

Posted by Darshan Singh <da...@gmail.com>.
Thanks,

I thought either Group by is causing the OOM but it is mentioned that sort
will be spilled to disk so that there is no way for that to cause the OOM.
So I was looking maybe due to back pressure some of data read from hdfs is
kept in memory as it is not consumed and that is causing OOM.So it seems
this is not possible as well so need to relook what could be causing the
OOM.

Thanks

On Wed, Aug 29, 2018 at 12:41 PM Zhijiang(wangzhijiang999) <
wangzhijiang999@aliyun.com> wrote:

> chesnay is right. For batch job there are two ways for notifying
> consumable. One is the first record emitted by upstream and the other is
> the upstream finishes all the records (blocking mode).
>
> For your case, the slow groupby node will trigger back-pressure and block
> the upstreams until source node. But it will not cause OOM for caching
> in-flight buffers normally because they are managed by framework and will
> not exceed unlimited, only if the netty buffer pool may cause that I
> experienced before.
>
> One possible way to avoid backpressure is to increase the parallelism of
> slow node(groupby in your case) or decrease the parallelism of fast
> node(source in your case).
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:Darshan Singh <da...@gmail.com>
> 发送时间:2018年8月29日(星期三) 18:16
> 收件人:chesnay <ch...@apache.org>
> 抄 送:wangzhijiang999 <wa...@aliyun.com>; user <
> user@flink.apache.org>
> 主 题:Re: Backpressure? for Batches
>
> Thanks, Now back to my question again. How can I say read at less speed
> from hdfs than my say map or group by can consume? Is there some sort of
> configuration which says read only 10000 rows and then stop and then reread
> etc. Otherwise source will keep on sending the data or keeping in some sort
> of buffers and will be OOM. Or setting different parallelism while reading
> is the way to handle this?
>
> Thanks
>
> On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler <ch...@apache.org>
> wrote:
> The semantics for LAZY_FROM_SOURCE are that tasks are scheduled *when
> there is data to be consumed*, i.e. one the first record was emitted by
> the previous operator. As such back-pressure exists in batch just like in
> streaming.
>
> On 29.08.2018 11:39, Darshan Singh wrote:
> Thanks,
>
> My job is simple. I am using table Api
> 1. Read from hdfs
> 2. Deserialize json to pojo and convert to table.
> 3. Group by some columns.
> 4. Convert back to dataset and write back to hdfs.
>
> In the WebUI I can see at least first 3 running concurrently which sort of
> makes sense. From your answer I understood that flink will do first number
> 1 once that is completed it will do map(or grouping as well) and then
> grouping and finally the write. Thus, there should be 1 task running at 1
> time. This doesnt seem right to me or I misunderstood what you said.
>
> So here if my group by is slow then I expect some sort of back pressure on
> the deserialise part or maybe reading from hdfs itself?
>
> Thanks
>
> On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com> wrote:
> The backpressure is caused when downstream and upstream are running
> concurrently, and the downstream is slower than the upstream.
> In stream job, the schedule mode will schedule both sides concurrently, so
> the backpressure may exist.
> As for batch job, the default schedule mode is LAZY_FROM_SOURCE I
> remember, that means the downstream will be scheduled after upstream
> finishes, so the slower downstream will not block upstream running, then
> the backpressure may not exist in this case.
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Darshan Singh <da...@gmail.com>
> 发送时间:2018年8月29日(星期三) 16:20
> 收件人:user <us...@flink.apache.org>
> 主 题:Backpressure? for Batches
>
> I faced the issue with back pressure in streams. I was wondering if we
> could face the same with the batches as well.
>
> In theory it should be possible. But in Web UI for backpressure tab for
> batches I was seeing that it was just showing the tasks status and no
> status like "OK" etc.
>
> So I was wondering if backpressure is a thing for batches. If yes, how do
> we reduce this especially if I am reading from hdfs.
>
> Thanks
>
>
>
>

回复:Backpressure? for Batches

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com>.
chesnay is right. For batch job there are two ways for notifying consumable. One is the first record emitted by upstream and the other is the upstream finishes all the records (blocking mode).

For your case, the slow groupby node will trigger back-pressure and block the upstreams until source node. But it will not cause OOM for caching in-flight buffers normally because they are managed by framework and will not exceed unlimited, only if the netty buffer pool may cause that I experienced before.

One possible way to avoid backpressure is to increase the parallelism of slow node(groupby in your case) or decrease the parallelism of fast node(source in your case).

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <da...@gmail.com>
发送时间:2018年8月29日(星期三) 18:16
收件人:chesnay <ch...@apache.org>
抄 送:wangzhijiang999 <wa...@aliyun.com>; user <us...@flink.apache.org>
主 题:Re: Backpressure? for Batches

Thanks, Now back to my question again. How can I say read at less speed from hdfs than my say map or group by can consume? Is there some sort of configuration which says read only 10000 rows and then stop and then reread etc. Otherwise source will keep on sending the data or keeping in some sort of buffers and will be OOM. Or setting different parallelism while reading is the way to handle this?

Thanks
On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler <ch...@apache.org> wrote:
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled when there is data to be consumed, i.e. one the first record was emitted by the previous operator. As such back-pressure exists in batch just like in streaming.

 On 29.08.2018 11:39, Darshan Singh wrote:
Thanks, 

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of makes sense. From your answer I understood that flink will do first number 1 once that is completed it will do map(or grouping as well) and then grouping and finally the write. Thus, there should be 1 task running at 1 time. This doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the deserialise part or maybe reading from hdfs itself? 

Thanks  
On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <wa...@aliyun.com> wrote:
The backpressure is caused when downstream and upstream are running concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, that means the downstream will be scheduled after upstream finishes, so the slower downstream will not block upstream running, then the backpressure may not exist in this case.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <da...@gmail.com>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <us...@flink.apache.org>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well. 

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks  




Re: Backpressure? for Batches

Posted by Darshan Singh <da...@gmail.com>.
Thanks, Now back to my question again. How can I say read at less speed
from hdfs than my say map or group by can consume? Is there some sort of
configuration which says read only 10000 rows and then stop and then reread
etc. Otherwise source will keep on sending the data or keeping in some sort
of buffers and will be OOM. Or setting different parallelism while reading
is the way to handle this?

Thanks

On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler <ch...@apache.org>
wrote:

> The semantics for LAZY_FROM_SOURCE are that tasks are scheduled *when
> there is data to be consumed*, i.e. one the first record was emitted by
> the previous operator. As such back-pressure exists in batch just like in
> streaming.
>
> On 29.08.2018 11:39, Darshan Singh wrote:
>
> Thanks,
>
> My job is simple. I am using table Api
> 1. Read from hdfs
> 2. Deserialize json to pojo and convert to table.
> 3. Group by some columns.
> 4. Convert back to dataset and write back to hdfs.
>
> In the WebUI I can see at least first 3 running concurrently which sort of
> makes sense. From your answer I understood that flink will do first number
> 1 once that is completed it will do map(or grouping as well) and then
> grouping and finally the write. Thus, there should be 1 task running at 1
> time. This doesnt seem right to me or I misunderstood what you said.
>
> So here if my group by is slow then I expect some sort of back pressure on
> the deserialise part or maybe reading from hdfs itself?
>
> Thanks
>
> On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <
> wangzhijiang999@aliyun.com> wrote:
>
>> The backpressure is caused when downstream and upstream are running
>> concurrently, and the downstream is slower than the upstream.
>> In stream job, the schedule mode will schedule both sides concurrently,
>> so the backpressure may exist.
>> As for batch job, the default schedule mode is LAZY_FROM_SOURCE I
>> remember, that means the downstream will be scheduled after upstream
>> finishes, so the slower downstream will not block upstream running, then
>> the backpressure may not exist in this case.
>>
>> Best,
>> Zhijiang
>>
>> ------------------------------------------------------------------
>> 发件人:Darshan Singh <da...@gmail.com>
>> 发送时间:2018年8月29日(星期三) 16:20
>> 收件人:user <us...@flink.apache.org>
>> 主 题:Backpressure? for Batches
>>
>> I faced the issue with back pressure in streams. I was wondering if we
>> could face the same with the batches as well.
>>
>> In theory it should be possible. But in Web UI for backpressure tab for
>> batches I was seeing that it was just showing the tasks status and no
>> status like "OK" etc.
>>
>> So I was wondering if backpressure is a thing for batches. If yes, how do
>> we reduce this especially if I am reading from hdfs.
>>
>> Thanks
>>
>>
>>
>

Re: Backpressure? for Batches

Posted by Chesnay Schepler <ch...@apache.org>.
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled /when 
there is data to be consumed/, i.e. one the first record was emitted by 
the previous operator. As such back-pressure exists in batch just like 
in streaming.

On 29.08.2018 11:39, Darshan Singh wrote:
> Thanks,
>
> My job is simple. I am using table Api
> 1. Read from hdfs
> 2. Deserialize json to pojo and convert to table.
> 3. Group by some columns.
> 4. Convert back to dataset and write back to hdfs.
>
> In the WebUI I can see at least first 3 running concurrently which 
> sort of makes sense. From your answer I understood that flink will do 
> first number 1 once that is completed it will do map(or grouping as 
> well) and then grouping and finally the write. Thus, there should be 1 
> task running at 1 time. This doesnt seem right to me or I 
> misunderstood what you said.
>
> So here if my group by is slow then I expect some sort of back 
> pressure on the deserialise part or maybe reading from hdfs itself?
>
> Thanks
>
> On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) 
> <wangzhijiang999@aliyun.com <ma...@aliyun.com>> wrote:
>
>     The backpressure is caused when downstream and upstream are
>     running concurrently, and the downstream is slower than the upstream.
>     In stream job, the schedule mode will schedule both sides
>     concurrently, so the backpressure may exist.
>     As for batch job, the default schedule mode is LAZY_FROM_SOURCE I
>     remember, that means the downstream will be scheduled after
>     upstream finishes, so the slower downstream will not block
>     upstream running, then the backpressure may not exist in this case.
>
>     Best,
>     Zhijiang
>
>         ------------------------------------------------------------------
>         发件人:Darshan Singh <darshan.meel@gmail.com
>         <ma...@gmail.com>>
>         发送时间:2018年8月29日(星期三) 16:20
>         收件人:user <user@flink.apache.org
>         <ma...@flink.apache.org>>
>         主 题:Backpressure? for Batches
>
>         I faced the issue with back pressure in streams. I was
>         wondering if we could face the same with the batches as well.
>
>         In theory it should be possible. But in Web UI for
>         backpressure tab for batches I was seeing that it was just
>         showing the tasks status and no status like "OK" etc.
>
>         So I was wondering if backpressure is a thing for batches. If
>         yes, how do we reduce this especially if I am reading from hdfs.
>
>         Thanks
>
>


Re: Backpressure? for Batches

Posted by Darshan Singh <da...@gmail.com>.
Thanks,

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of
makes sense. From your answer I understood that flink will do first number
1 once that is completed it will do map(or grouping as well) and then
grouping and finally the write. Thus, there should be 1 task running at 1
time. This doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on
the deserialise part or maybe reading from hdfs itself?

Thanks

On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <
wangzhijiang999@aliyun.com> wrote:

> The backpressure is caused when downstream and upstream are running
> concurrently, and the downstream is slower than the upstream.
> In stream job, the schedule mode will schedule both sides concurrently, so
> the backpressure may exist.
> As for batch job, the default schedule mode is LAZY_FROM_SOURCE I
> remember, that means the downstream will be scheduled after upstream
> finishes, so the slower downstream will not block upstream running, then
> the backpressure may not exist in this case.
>
> Best,
> Zhijiang
>
> ------------------------------------------------------------------
> 发件人:Darshan Singh <da...@gmail.com>
> 发送时间:2018年8月29日(星期三) 16:20
> 收件人:user <us...@flink.apache.org>
> 主 题:Backpressure? for Batches
>
> I faced the issue with back pressure in streams. I was wondering if we
> could face the same with the batches as well.
>
> In theory it should be possible. But in Web UI for backpressure tab for
> batches I was seeing that it was just showing the tasks status and no
> status like "OK" etc.
>
> So I was wondering if backpressure is a thing for batches. If yes, how do
> we reduce this especially if I am reading from hdfs.
>
> Thanks
>
>
>

回复:Backpressure? for Batches

Posted by "Zhijiang(wangzhijiang999)" <wa...@aliyun.com>.
The backpressure is caused when downstream and upstream are running concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, that means the downstream will be scheduled after upstream finishes, so the slower downstream will not block upstream running, then the backpressure may not exist in this case.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <da...@gmail.com>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <us...@flink.apache.org>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks