You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2020/12/21 07:02:00 UTC

[jira] [Closed] (FLINK-20618) Some of the source operator subtasks will stuck when flink job in critical backpressure

     [ https://issues.apache.org/jira/browse/FLINK-20618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Piotr Nowojski closed FLINK-20618.
----------------------------------
    Resolution: Not A Bug

Ok, thanks for coming back. I'm closing this as a bug for now.

Indeed, such kind of network error is very strange. I would expect some TCP connection error after some time. For example isn't the TCP's [keep alive|https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive] default value 2hours?

Anyway, please re-open or comment on this ticket if you will have some new findings.

> Some of the source operator subtasks will stuck when flink job in critical backpressure
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-20618
>                 URL: https://issues.apache.org/jira/browse/FLINK-20618
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.10.0, 1.11.1
>            Reporter: zlzhang0122
>            Priority: Critical
>         Attachments: 2020-12-16 11-47-37 的屏幕截图.png, 2020-12-16 11-48-30 的屏幕截图.png, 2020-12-16 11-49-01 的屏幕截图.png, 2020-12-16 11-53-42 的屏幕截图.png, 2020-12-17 11-10-06 的屏幕截图.png, 2020-12-17 16-45-00 的屏幕截图.png, stuck_node.txt, stuck_node_downstream.txt
>
>
> In some critical backpressure situation, some of the subtasks of source will blocked to request buffer because of the LocalBufferPool is full,so the whole task will be stuck and the other task run well.
> Bellow is the jstack trace:
>  
> Legacy Source Thread - Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) -&gt; SourceConversion(table=[default_catalog.default_database.transfer_c5, source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl]) -&gt; Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #113 prio=5 os_prio=0 tid=0x00007f43d07e1800 nid=0x1b1c waiting on condition [0x00007f43b8488000]
> java.lang.Thread.State: WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for &lt;0x00000000db234488&gt; (a java.util.concurrent.CompletableFuture$Signaller)
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
>     at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
>     at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
>     at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
>     at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
>     at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at StreamExecCalc$33.processElement(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at SourceConversion$4.processElement(Unknown Source)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:732)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:710)
>     at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>     - locked &lt;0x00000000d8d50fa8&gt; (a java.lang.Object)
>     at org.apache.flink.streaming.connectors.talos.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:379)
>     - locked &lt;0x00000000d8d50fa8&gt; (a java.lang.Object)
>     at org.apache.flink.streaming.connectors.talos.internals.TalosFetcher2.runFetchLoop(TalosFetcher2.java:249)
>     at org.apache.flink.streaming.connectors.talos.FlinkTalosConsumerBase.run(FlinkTalosConsumerBase.java:758)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>  
>  
>  
> Source: TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl) -&gt; SourceConversion(table=[default_catalog.default_database.transfer_c5, source: [TalosTableSource(hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl)]], fields=[hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl]) -&gt; Calc(select=[hash, timestamp, step, isCustomize, hostname, endpoint, metric, dsType, orgId, idc, labels, pdl, () AS $12, (timestamp - ((timestamp + 18000) MOD 86400)) AS $13]) (62/128) #108 prio=5 os_prio=0 tid=0x00007f44dc178000 nid=0x1332 waiting for monitor entry [0x00007f443dfd8000]
> java.lang.Thread.State: BLOCKED (on object monitor)
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:86)
>     - waiting to lock &lt;0x00000000d8d50fa8&gt; (a java.lang.Object)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)