You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (JIRA)" <ji...@apache.org> on 2019/05/15 12:06:00 UTC

[jira] [Closed] (FLINK-12510) Deadlock when reading from InputGates

     [ https://issues.apache.org/jira/browse/FLINK-12510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Piotr Nowojski closed FLINK-12510.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 1.9.0

merged commit eb8fff8 into apache:master

> Deadlock when reading from InputGates
> -------------------------------------
>
>                 Key: FLINK-12510
>                 URL: https://issues.apache.org/jira/browse/FLINK-12510
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.9.0
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.9.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> One refactor in https://issues.apache.org/jira/browse/FLINK-12434 caused a potential deadlock as visible here (from UnionStaticDynamicIterationITCase):
>  
> {noformat}
> "CHAIN Union -> Pipe (4/4)":
> 	at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:63)
> 	- waiting to lock <0x00000000818edc90> (a java.util.HashMap)
> 	at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)
> 	- locked <0x00000000890001d8> (a java.lang.Object)
> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:500)
> 	- locked <0x00000000890001e8> (a java.lang.Object)
> 	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.requestPartitions(UnionInputGate.java:160)
> 	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:183)
> 	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:169)
> 	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> 	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> 	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> 	at org.apache.flink.runtime.operators.UnionWithTempOperator.run(UnionWithTempOperator.java:71)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
> 	at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:157)
> 	at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:122)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
> 	at java.lang.Thread.run(Thread.java:748)
> "flink-akka.actor.default-dispatcher-4":
> 	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.queueInputGate(UnionInputGate.java:296)
> 	- waiting to lock <0x00000000890019a8> (a java.util.LinkedHashSet)
> 	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.lambda$new$0(UnionInputGate.java:119)
> 	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate$$Lambda$224/305821273.run(Unknown Source)
> 	at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)
> 	at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687)
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> 	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.queueChannel(SingleInputGate.java:672)
> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.notifyChannelNonEmpty(SingleInputGate.java:643)
> 	at org.apache.flink.runtime.io.network.partition.consumer.InputChannel.notifyChannelNonEmpty(InputChannel.java:125)
> 	at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.notifyDataAvailable(LocalInputChannel.java:203)
> 	at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition.createReadView(BoundedBlockingSubpartition.java:216)
> 	- locked <0x0000000084c068a8> (a java.lang.Object)
> 	at org.apache.flink.runtime.io.network.partition.ResultPartition.createSubpartitionView(ResultPartition.java:350)
> 	at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:71)
> 	- locked <0x00000000818edc90> (a java.util.HashMap)
> 	at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)
> 	- locked <0x0000000089001fa8> (a java.lang.Object)
> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.updateInputChannel(SingleInputGate.java:382)
> 	- locked <0x0000000084c06948> (a java.lang.Object)
> 	at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$updatePartitions$1(TaskExecutor.java:626)
> 	at org.apache.flink.runtime.taskexecutor.TaskExecutor$$Lambda$222/1536847067.run(Unknown Source)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> 	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> "CHAIN Union -> Pipe (3/4)":
> 	at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.onConsumedPartition(ResultPartitionManager.java:114)
> 	- waiting to lock <0x00000000818edc90> (a java.util.HashMap)
> 	at org.apache.flink.runtime.io.network.partition.ResultPartition.onConsumedSubpartition(ResultPartition.java:438)
> 	at org.apache.flink.runtime.io.network.partition.ResultSubpartition.onConsumedSubpartition(ResultSubpartition.java:58)
> 	at org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionReader.notifySubpartitionConsumed(BoundedBlockingSubpartitionReader.java:99)
> 	at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.notifySubpartitionConsumed(LocalInputChannel.java:243)
> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.transformToBufferOrEvent(SingleInputGate.java:617)
> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:538)
> 	at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:519)
> 	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextData(UnionInputGate.java:217)
> 	- locked <0x00000000890019a8> (a java.util.LinkedHashSet)
> 	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:185)
> 	at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:169)
> 	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> 	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> 	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> 	at org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator.next(SpillingResettableMutableObjectIterator.java:149)
> 	at org.apache.flink.runtime.operators.UnionWithTempOperator.run(UnionWithTempOperator.java:77)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504)
> 	at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:157)
> 	at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:122)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
> 	at java.lang.Thread.run(Thread.java:748)
> Found 1 deadlock.{noformat}
> https://api.travis-ci.org/v3/job/531956581/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)