You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Ufuk Celebi (JIRA)" <ji...@apache.org> on 2015/09/15 23:12:46 UTC
[jira] [Assigned] (FLINK-2685) TaskManager deadlock on
NetworkBufferPool
[ https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ufuk Celebi reassigned FLINK-2685:
----------------------------------
Assignee: Ufuk Celebi
> TaskManager deadlock on NetworkBufferPool
> -----------------------------------------
>
> Key: FLINK-2685
> URL: https://issues.apache.org/jira/browse/FLINK-2685
> Project: Flink
> Issue Type: Bug
> Components: Distributed Runtime
> Affects Versions: master
> Reporter: Greg Hogan
> Assignee: Ufuk Celebi
>
> This deadlock occurs intermittently. I have a {{join}} followed by a {{chain<join,filter>}} followed by a {{reduceGroup}}. Stack traces and local variables from one each of the {{join}} threads below.
> The {{join}}'s are waiting on a buffer to become available ({{networkBufferPool.availableMemorySegments.count=0}}). Both {{LocalBufferPool}}'s have been given extra capacity ({{currentPoolSize=60 > numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}).
> {{LocalBufferPool.returnExcessMemorySegments}} only recycles {{MemorySegment}}'s from its {{availableMemorySegments}}, so any requested {{Buffer}}'s will only be released when explicitly recycled.
> First join stack trace and variable values from {{LocalBufferPool.requestBuffer}}:
> {noformat}
> owns: SpanningRecordSerializer<T> (id=723)
> waiting for: ArrayDeque<E> (id=724)
> Object.wait(long) line: not available [native method]
> LocalBufferPool.requestBuffer(boolean) line: 163
> LocalBufferPool.requestBufferBlocking() line: 133
> RecordWriter<T>.emit(T) line: 92
> OutputCollector<T>.collect(T) line: 65
> JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 1088
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 137
> JoinDriver<IT1,IT2,OT>.run() line: 208
> RegularPactTask<S,OT>.run() line: 489
> RegularPactTask<S,OT>.invoke() line: 354
> Task.run() line: 581
> Thread.run() line: 745
> {noformat}
> {noformat}
> this LocalBufferPool (id=403)
> availableMemorySegments ArrayDeque<E> (id=398)
> elements Object[16] (id=422)
> head 14
> tail 14
> currentPoolSize 60
> isDestroyed false
> networkBufferPool NetworkBufferPool (id=354)
> allBufferPools HashSet<E> (id=424)
> availableMemorySegments ArrayBlockingQueue<E> (id=427)
> count 0
> items Object[10240] (id=674)
> itrs null
> lock ReentrantLock (id=675)
> notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678)
> notFull AbstractQueuedSynchronizer$ConditionObject (id=679)
> putIndex 6954
> takeIndex 6954
> factoryLock Object (id=430)
> isDestroyed false
> managedBufferPools HashSet<E> (id=431)
> memorySegmentSize 32768
> numTotalRequiredBuffers 3226
> totalNumberOfMemorySegments 10240
> numberOfRequestedMemorySegments 60
> numberOfRequiredMemorySegments 32
> owner null
> registeredListeners ArrayDeque<E> (id=421)
> elements Object[16] (id=685)
> head 0
> tail 0
> askToRecycle false
> isBlocking true
> {noformat}
> Second join stack trace and variable values from {{SingleInputGate.getNextBufferOrEvent}}:
> {noformat}
> Unsafe.park(boolean, long) line: not available [native method]
> LockSupport.parkNanos(Object, long) line: 215
> AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078
> LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467
> SingleInputGate.getNextBufferOrEvent() line: 414
> MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79
> MutableRecordReader<T>.next(T) line: 34
> ReaderIterator<T>.next(T) line: 59
> MutableHashTable$ProbeIterator<PT>.next() line: 1581
> MutableHashTable<BT,PT>.processProbeIter() line: 457
> MutableHashTable<BT,PT>.nextRecord() line: 555
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 110
> JoinDriver<IT1,IT2,OT>.run() line: 208
> RegularPactTask<S,OT>.run() line: 489
> RegularPactTask<S,OT>.invoke() line: 354
> Task.run() line: 581
> Thread.run() line: 745
> {noformat}
> {noformat}
> this SingleInputGate (id=693)
> bufferPool LocalBufferPool (id=706)
> availableMemorySegments ArrayDeque<E> (id=716)
> elements Object[16] (id=717)
> head 0
> tail 0
> currentPoolSize 60
> isDestroyed false
> networkBufferPool NetworkBufferPool (id=354)
> allBufferPools HashSet<E> (id=424)
> availableMemorySegments ArrayBlockingQueue<E> (id=427)
> count 0
> items Object[10240] (id=674)
> itrs null
> lock ReentrantLock (id=675)
> notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678)
> notFull AbstractQueuedSynchronizer$ConditionObject (id=679)
> putIndex 6954
> takeIndex 6954
> factoryLock Object (id=430)
> isDestroyed false
> managedBufferPools HashSet<E> (id=431)
> memorySegmentSize 32768
> numTotalRequiredBuffers 3226
> totalNumberOfMemorySegments 10240
> numberOfRequestedMemorySegments 0
> numberOfRequiredMemorySegments 32
> owner null
> registeredListeners ArrayDeque<E> (id=718)
> channelsWithEndOfPartitionEvents BitSet (id=707)
> consumedResultId IntermediateDataSetID (id=708)
> consumedSubpartitionIndex 24
> executionId ExecutionAttemptID (id=709)
> hasReceivedAllEndOfPartitionEvents false
> inputChannels HashMap<K,V> (id=710)
> inputChannelsWithData LinkedBlockingQueue<E> (id=692)
> capacity 2147483647
> count AtomicInteger (id=698)
> value 0
> head LinkedBlockingQueue$Node<E> (id=701)
> last LinkedBlockingQueue$Node<E> (id=701)
> notEmpty AbstractQueuedSynchronizer$ConditionObject (id=691)
> notFull AbstractQueuedSynchronizer$ConditionObject (id=703)
> putLock ReentrantLock (id=704)
> takeLock ReentrantLock (id=705)
> isReleased false
> jobId JobID (id=711)
> numberOfInputChannels 32
> numberOfUninitializedChannels 0
> owningTaskName "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" (id=712)
> partitionStateChecker NetworkEnvironment$JobManagerPartitionStateChecker (id=363)
> pendingEvents ArrayList<E> (id=713)
> registeredListeners CopyOnWriteArrayList<E> (id=714)
> requestedPartitionsFlag true
> requestLock Object (id=715)
> retriggerLocalRequestTimer null
> currentChannel null
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)