You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Greg Hogan (JIRA)" <ji...@apache.org> on 2015/09/15 21:41:46 UTC
[jira] [Created] (FLINK-2685) TaskManager deadlock on
NetworkBufferPool
Greg Hogan created FLINK-2685:
---------------------------------
Summary: TaskManager deadlock on NetworkBufferPool
Key: FLINK-2685
URL: https://issues.apache.org/jira/browse/FLINK-2685
Project: Flink
Issue Type: Bug
Components: Distributed Runtime
Affects Versions: master
Reporter: Greg Hogan
This deadlock occurs intermittently. I have a {join} followed by a {chain<join,filter>} followed by a {reduceGroup}. Stack traces and local variables from one each of the {join} threads below.
The {join}s are waiting on a buffer to become available ({networkBufferPool.availableMemorySegments.count=0}). Both {LocalBufferPool}s have been given extra capacity ({currentPoolSize=60 > numberOfRequiredMemorySegments=32}). The first {join} is at full capacity ({currentPoolSize=numberOfRequestedMemorySegments=60}) yet the second {join} has not acquired any ({numberOfRequestedMemorySegments=0}).
{LocalBufferPool.returnExcessMemorySegments} only recycles {MemorySegment}s from its {availableMemorySegments}, so any requested {Buffer}s will only be released when explicitly recycled.
First join stack trace and variable values from {LocalBufferPool.requestBuffer}:
{noformat}
owns: SpanningRecordSerializer<T> (id=723)
waiting for: ArrayDeque<E> (id=724)
Object.wait(long) line: not available [native method]
LocalBufferPool.requestBuffer(boolean) line: 163
LocalBufferPool.requestBufferBlocking() line: 133
RecordWriter<T>.emit(T) line: 92
OutputCollector<T>.collect(T) line: 65
JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 1088
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 137
JoinDriver<IT1,IT2,OT>.run() line: 208
RegularPactTask<S,OT>.run() line: 489
RegularPactTask<S,OT>.invoke() line: 354
Task.run() line: 581
Thread.run() line: 745
{noformat}
{noformat}
this LocalBufferPool (id=403)
availableMemorySegments ArrayDeque<E> (id=398)
elements Object[16] (id=422)
head 14
tail 14
currentPoolSize 60
isDestroyed false
networkBufferPool NetworkBufferPool (id=354)
allBufferPools HashSet<E> (id=424)
availableMemorySegments ArrayBlockingQueue<E> (id=427)
count 0
items Object[10240] (id=674)
itrs null
lock ReentrantLock (id=675)
notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678)
notFull AbstractQueuedSynchronizer$ConditionObject (id=679)
putIndex 6954
takeIndex 6954
factoryLock Object (id=430)
isDestroyed false
managedBufferPools HashSet<E> (id=431)
memorySegmentSize 32768
numTotalRequiredBuffers 3226
totalNumberOfMemorySegments 10240
numberOfRequestedMemorySegments 60
numberOfRequiredMemorySegments 32
owner null
registeredListeners ArrayDeque<E> (id=421)
elements Object[16] (id=685)
head 0
tail 0
askToRecycle false
isBlocking true
{noformat}
Second join stack trace and variable values from {SingleInputGate.getNextBufferOrEvent}:
{noformat}
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.parkNanos(Object, long) line: 215
AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078
LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467
SingleInputGate.getNextBufferOrEvent() line: 414
MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79
MutableRecordReader<T>.next(T) line: 34
ReaderIterator<T>.next(T) line: 59
MutableHashTable$ProbeIterator<PT>.next() line: 1581
MutableHashTable<BT,PT>.processProbeIter() line: 457
MutableHashTable<BT,PT>.nextRecord() line: 555
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 110
JoinDriver<IT1,IT2,OT>.run() line: 208
RegularPactTask<S,OT>.run() line: 489
RegularPactTask<S,OT>.invoke() line: 354
Task.run() line: 581
Thread.run() line: 745
{noformat}
{noformat}
this SingleInputGate (id=693)
bufferPool LocalBufferPool (id=706)
availableMemorySegments ArrayDeque<E> (id=716)
elements Object[16] (id=717)
head 0
tail 0
currentPoolSize 60
isDestroyed false
networkBufferPool NetworkBufferPool (id=354)
allBufferPools HashSet<E> (id=424)
availableMemorySegments ArrayBlockingQueue<E> (id=427)
count 0
items Object[10240] (id=674)
itrs null
lock ReentrantLock (id=675)
notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678)
notFull AbstractQueuedSynchronizer$ConditionObject (id=679)
putIndex 6954
takeIndex 6954
factoryLock Object (id=430)
isDestroyed false
managedBufferPools HashSet<E> (id=431)
memorySegmentSize 32768
numTotalRequiredBuffers 3226
totalNumberOfMemorySegments 10240
numberOfRequestedMemorySegments 0
numberOfRequiredMemorySegments 32
owner null
registeredListeners ArrayDeque<E> (id=718)
channelsWithEndOfPartitionEvents BitSet (id=707)
consumedResultId IntermediateDataSetID (id=708)
consumedSubpartitionIndex 24
executionId ExecutionAttemptID (id=709)
hasReceivedAllEndOfPartitionEvents false
inputChannels HashMap<K,V> (id=710)
inputChannelsWithData LinkedBlockingQueue<E> (id=692)
capacity 2147483647
count AtomicInteger (id=698)
value 0
head LinkedBlockingQueue$Node<E> (id=701)
last LinkedBlockingQueue$Node<E> (id=701)
notEmpty AbstractQueuedSynchronizer$ConditionObject (id=691)
notFull AbstractQueuedSynchronizer$ConditionObject (id=703)
putLock ReentrantLock (id=704)
takeLock ReentrantLock (id=705)
isReleased false
jobId JobID (id=711)
numberOfInputChannels 32
numberOfUninitializedChannels 0
owningTaskName "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" (id=712)
partitionStateChecker NetworkEnvironment$JobManagerPartitionStateChecker (id=363)
pendingEvents ArrayList<E> (id=713)
registeredListeners CopyOnWriteArrayList<E> (id=714)
requestedPartitionsFlag true
requestLock Object (id=715)
retriggerLocalRequestTimer null
currentChannel null
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)