You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yury Ruchin <yu...@gmail.com> on 2016/12/17 12:42:42 UTC

Blocking RichFunction.open() and backpressure

Hi all,

I have a streaming job that essentially looks like this: KafkaSource -> Map
-> EventTimeWindow -> RichFlatMap -> CustomSink. The RichFlatMap part does
some heavy lifting in open(), so that the open() call blocks for several
minutes. I assumed that until open() returns the backpressure mechanism
would slow down the entire upstream up to the KafkaSource, so that no new
records would be emitted to the pipeline until the RichFlatMap is ready.
What I actually observe is that the KafkaSource, Map and EventTimeWindow
continue processing - the in/out records, in/out MB counters keep
increasing. The RichFlatMap and its downstream CustomSink have 0 as
expected, until the RichFlatMap is actually done with open(). The
backpressure monitor in Flink UI shows "OK" for all operators.

Why doesn't backpressure mechanism work in this case?

Thanks,
Yury

Re: Blocking RichFunction.open() and backpressure

Posted by Yury Ruchin <yu...@gmail.com>.
Thanks Fabian, that quite explains what's going on.

2016-12-19 12:19 GMT+03:00 Fabian Hueske <fh...@gmail.com>:

> Hi Yury,
>
> Flink's operators start processing as soon as they receive data. If an
> operator produces more data than its successor task can process, the data
> is buffered in Flink's network stack, i.e., its network buffers.
> The backpressure mechanism kicks in when all network buffers are in use
> and no more data can be buffered. In this case, a producing task will block
> until a network buffer becomes available.
>
> If the window operator in your job aggregates the data, only the
> aggregates will be buffered.
> This might explain why the first operators of job are able to start
> processing while the FlatMap operator is still setting up itself.
>
> Best,
> Fabian
>
> 2016-12-17 13:42 GMT+01:00 Yury Ruchin <yu...@gmail.com>:
>
>> Hi all,
>>
>> I have a streaming job that essentially looks like this: KafkaSource ->
>> Map -> EventTimeWindow -> RichFlatMap -> CustomSink. The RichFlatMap part
>> does some heavy lifting in open(), so that the open() call blocks for
>> several minutes. I assumed that until open() returns the backpressure
>> mechanism would slow down the entire upstream up to the KafkaSource, so
>> that no new records would be emitted to the pipeline until the RichFlatMap
>> is ready. What I actually observe is that the KafkaSource, Map and
>> EventTimeWindow continue processing - the in/out records, in/out MB
>> counters keep increasing. The RichFlatMap and its downstream CustomSink
>> have 0 as expected, until the RichFlatMap is actually done with open(). The
>> backpressure monitor in Flink UI shows "OK" for all operators.
>>
>> Why doesn't backpressure mechanism work in this case?
>>
>> Thanks,
>> Yury
>>
>
>

Re: Blocking RichFunction.open() and backpressure

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Yury,

Flink's operators start processing as soon as they receive data. If an
operator produces more data than its successor task can process, the data
is buffered in Flink's network stack, i.e., its network buffers.
The backpressure mechanism kicks in when all network buffers are in use and
no more data can be buffered. In this case, a producing task will block
until a network buffer becomes available.

If the window operator in your job aggregates the data, only the aggregates
will be buffered.
This might explain why the first operators of job are able to start
processing while the FlatMap operator is still setting up itself.

Best,
Fabian

2016-12-17 13:42 GMT+01:00 Yury Ruchin <yu...@gmail.com>:

> Hi all,
>
> I have a streaming job that essentially looks like this: KafkaSource ->
> Map -> EventTimeWindow -> RichFlatMap -> CustomSink. The RichFlatMap part
> does some heavy lifting in open(), so that the open() call blocks for
> several minutes. I assumed that until open() returns the backpressure
> mechanism would slow down the entire upstream up to the KafkaSource, so
> that no new records would be emitted to the pipeline until the RichFlatMap
> is ready. What I actually observe is that the KafkaSource, Map and
> EventTimeWindow continue processing - the in/out records, in/out MB
> counters keep increasing. The RichFlatMap and its downstream CustomSink
> have 0 as expected, until the RichFlatMap is actually done with open(). The
> backpressure monitor in Flink UI shows "OK" for all operators.
>
> Why doesn't backpressure mechanism work in this case?
>
> Thanks,
> Yury
>