You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhijiang (Jira)" <ji...@apache.org> on 2020/08/06 04:16:00 UTC

[jira] [Comment Edited] (FLINK-18832) BoundedBlockingSubpartition does not work with StreamTask

    [ https://issues.apache.org/jira/browse/FLINK-18832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171894#comment-17171894 ] 

Zhijiang edited comment on FLINK-18832 at 8/6/20, 4:15 AM:
-----------------------------------------------------------

Thanks for reporting this bug [~trohrmann]!

By design I guess the BoundedBlockingSubpartition assumes no concurrent issue for `#flushCurrentBuffer` method. But actually the task thread and flusher thread can touch this method concurrently. I can think of some options for resolving it:

* Disable flusher thread for batch jobs, because it has no benefits for latency concern as the downstream will only request partition after upstream finishes based on current schedule way. Even it would bring harm for upstream writer to spill partial buffer after flush triggered.
* From long term goal, the flusher thread should be delegated by mailbox model, so we can avoid concurrent issue even if the flusher timeout valid for batch jobs.
* Breaks the previous assumption to allow concurrent access of  `BoundedBlockingSubpartition#flushCurrentBuffer`.

If we can realize the second option soon, then we can bypass this bug. I remembered [~pnowojski]already submitted the PR for it before, but have not merged yet. If this way can not be realized in short time, then i prefer the first option to work around. WDYT?


was (Author: zjwang):
Thanks for reporting this bug [~trohrmann]!

By design I guess the BoundedBlockingSubpartition assumes no concurrent issue for `#flushCurrentBuffer` method. But actually the task thread and flusher thread can touch this method concurrently. I can thought of some options for resolving it:

* Disable flusher thread for batch jobs, because it has no benefits for latency concern as the downstream will only request partition after upstream finishes based on current schedule way. Even it would bring harm for upstream writer to spill partial buffer after flush triggered.
* From long term goal, the flusher thread should be delegated by mailbox model, so we can avoid concurrent issue even if the flusher timeout valid for batch jobs.
* Breaks the previous assumption to allow concurrent access of  `BoundedBlockingSubpartition#flushCurrentBuffer`.

If we can realize the second option soon, then we can bypass this bug. I remembered [~pnowojski]already submitted the PR for it before, but have not merged yet. If this way can not be realized in short time, then i prefer the first option to work around. WDYT?

> BoundedBlockingSubpartition does not work with StreamTask
> ---------------------------------------------------------
>
>                 Key: FLINK-18832
>                 URL: https://issues.apache.org/jira/browse/FLINK-18832
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network, Runtime / Task
>    Affects Versions: 1.10.1, 1.12.0, 1.11.1
>            Reporter: Till Rohrmann
>            Priority: Major
>
> The {{BoundedBlockingSubpartition}} does not work with a {{StreamTask}} because the {{StreamTask}} instantiates an {{OutputFlusher}} which concurrently accesses the {{BoundedBlockingSubpartition}}. This concurrency can lead to a double closing of the underlying {{BufferConsumer}} which manifests in this stack trace:
> {code}
> [9:15 PM] Caused by: org.apache.flink.shaded.netty4.io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1
> 	at org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain0(ReferenceCountUpdater.java:123)
> 	at org.apache.flink.shaded.netty4.io.netty.util.internal.ReferenceCountUpdater.retain(ReferenceCountUpdater.java:110)
> 	at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:80)
> 	at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:174)
> 	at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.retainBuffer(NetworkBuffer.java:47)
> 	at org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:127)
> 	at org.apache.flink.runtime.io.network.buffer.ReadOnlySlicedNetworkBuffer.retainBuffer(ReadOnlySlicedNetworkBuffer.java:41)
> 	at org.apache.flink.runtime.io.network.buffer.BufferConsumer.build(BufferConsumer.java:108)
> 	at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.writeAndCloseBufferConsumer(BoundedBlockingSubpartition.java:156)
> 	at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flushCurrentBuffer(BoundedBlockingSubpartition.java:144)
> 	at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.flush(BoundedBlockingSubpartition.java:135)
> 	at org.apache.flink.runtime.io.network.partition.ResultPartition.flushAll(ResultPartition.java:245)
> 	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flushAll(RecordWriter.java:183)
> 	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.flush(RecordWriterOutput.java:156)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.flushOutputs(OperatorChain.java:344)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:602)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> 	at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)