You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ken Krugler <kk...@transpac.com> on 2017/11/10 21:31:41 UTC

Issue with back pressure and AsyncFunction

Hi all,

I was debugging a curious problem with a streaming job that contained an iteration and several AsynFunctions.

The entire job would stall out, with no progress being made.

But when I checked back pressure, only one function showed it as being high - everything else was OK.

And when I dumped threads, the only bit of my code that was running was indeed that one function w/high back pressure, stuck while making a collect() call.

There were two issues here….

1. A downstream function in the iteration was (significantly) increasing the number of tuples - it would get one in, and sometimes emit 100+.

The output would loop back as input via the iteration.

This eventually caused the network buffers to fill up, and that’s why the job got stuck.

I had to add my own tracking/throttling in one of my custom function, to avoid having too many “active” tuples.

So maybe something to note in documentation on iterations, if it’s not there already.

2. The back pressure calculation doesn’t take into account AsyncIO

When I double-checked the thread dump, there were actually a number of threads (one for each of my AsyncFunctions) that were stuck calling collect().

These all were named "AsyncIO-Emitter-Thread (<name of AsyncFunction>…). For example:

> "AsyncIO-Emitter-Thread (MyAsyncFunction -> (<blah>)) (1/1))" #125 daemon prio=5 os_prio=31 tid=0x00007fb191025800 nid=0xac0b in Object.wait() [0x00007000123f0000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> 	at java.lang.Object.wait(Native Method)
> 	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:224)
> 	- locked <0x0000000773cb3ec0> (a java.util.ArrayDeque)
> 	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:193)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
> 	- locked <0x0000000773b98020> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
> 	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:85)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:83)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:41)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> 	at org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:140)
> 	at org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:42)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> 	at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:132)
> 	- locked <0x0000000773b1bb70> (a java.lang.Object)
> 	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
> 	at java.lang.Thread.run(Thread.java:748)



I’m assuming that when my AsyncFunction calls collect(), this hands off the tuple to this AsyncIO-Emitter-Thread thread, which is why none of my code (either AsyncFunctions or threads in my pool doing async stuff) shows up in the thread dump.

And I’m assuming that the back pressure calculation isn’t associating these threads with the source function, which is why they don’t show up in the GUI.

I’m hoping someone can confirm the above. If so, I’ll file an issue.

Thanks,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr


Re: Issue with back pressure and AsyncFunction

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Unfortunately, I don't have anything to add. Yes, back pressure doesn't work correctly for functions that do work outside the main thread and iterations currently don't work well and can lead to deadlocks.

Did you already open issues for those by now?

Best,
Aljoscha

> On 10. Nov 2017, at 22:46, Ufuk Celebi <uc...@apache.org> wrote:
> 
> Hey Ken,
> 
> thanks for your message. Both your comments are correct (see inline).
> 
> On Fri, Nov 10, 2017 at 10:31 PM, Ken Krugler
> <kk...@transpac.com> wrote:
>> 1. A downstream function in the iteration was (significantly) increasing the
>> number of tuples - it would get one in, and sometimes emit 100+.
>> 
>> The output would loop back as input via the iteration.
>> 
>> This eventually caused the network buffers to fill up, and that’s why the
>> job got stuck.
>> 
>> I had to add my own tracking/throttling in one of my custom function, to
>> avoid having too many “active” tuples.
>> 
>> So maybe something to note in documentation on iterations, if it’s not there
>> already.
> 
> Yes, iterations are prone to deadlock due to the way that data is
> exchanged between the sink and head nodes. There have been multiple
> attempts to fix these shortcomings, but I don't know what the latest
> state is. Maybe Aljoscha (CC'd) has some input...
> 
>> 2. The back pressure calculation doesn’t take into account AsyncIO
> 
> Correct, the back pressure monitoring only takes the main task thread
> into account. Every operator that uses a separate thread to emit
> records (like Async I/O oder Kafka source) is therefore not covered by
> the back pressure monitoring.
> 
> – Ufuk


Re: Issue with back pressure and AsyncFunction

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Ken,

thanks for your message. Both your comments are correct (see inline).

On Fri, Nov 10, 2017 at 10:31 PM, Ken Krugler
<kk...@transpac.com> wrote:
> 1. A downstream function in the iteration was (significantly) increasing the
> number of tuples - it would get one in, and sometimes emit 100+.
>
> The output would loop back as input via the iteration.
>
> This eventually caused the network buffers to fill up, and that’s why the
> job got stuck.
>
> I had to add my own tracking/throttling in one of my custom function, to
> avoid having too many “active” tuples.
>
> So maybe something to note in documentation on iterations, if it’s not there
> already.

Yes, iterations are prone to deadlock due to the way that data is
exchanged between the sink and head nodes. There have been multiple
attempts to fix these shortcomings, but I don't know what the latest
state is. Maybe Aljoscha (CC'd) has some input...

> 2. The back pressure calculation doesn’t take into account AsyncIO

Correct, the back pressure monitoring only takes the main task thread
into account. Every operator that uses a separate thread to emit
records (like Async I/O oder Kafka source) is therefore not covered by
the back pressure monitoring.

– Ufuk