You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Greg Hogan (JIRA)" <ji...@apache.org> on 2015/09/15 21:42:46 UTC
[jira] [Updated] (FLINK-2685) TaskManager deadlock on
NetworkBufferPool
[ https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Greg Hogan updated FLINK-2685:
------------------------------
Description:
This deadlock occurs intermittently. I have a {{join}} followed by a {{chain<join,filter>}} followed by a {{reduceGroup}}. Stack traces and local variables from one each of the {{join}} threads below.
The {{join}}s are waiting on a buffer to become available ({{networkBufferPool.availableMemorySegments.count=0}}). Both {{LocalBufferPool}}s have been given extra capacity ({{currentPoolSize=60 > numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}).
{{LocalBufferPool.returnExcessMemorySegments}} only recycles {{MemorySegment}}s from its {{availableMemorySegments}}, so any requested {{Buffer}}s will only be released when explicitly recycled.
First join stack trace and variable values from {{LocalBufferPool.requestBuffer}}:
{{noformat}}
owns: SpanningRecordSerializer<T> (id=723)
waiting for: ArrayDeque<E> (id=724)
Object.wait(long) line: not available [native method]
LocalBufferPool.requestBuffer(boolean) line: 163
LocalBufferPool.requestBufferBlocking() line: 133
RecordWriter<T>.emit(T) line: 92
OutputCollector<T>.collect(T) line: 65
JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 1088
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 137
JoinDriver<IT1,IT2,OT>.run() line: 208
RegularPactTask<S,OT>.run() line: 489
RegularPactTask<S,OT>.invoke() line: 354
Task.run() line: 581
Thread.run() line: 745
{{noformat}}
{{noformat}}
this LocalBufferPool (id=403)
availableMemorySegments ArrayDeque<E> (id=398)
elements Object[16] (id=422)
head 14
tail 14
currentPoolSize 60
isDestroyed false
networkBufferPool NetworkBufferPool (id=354)
allBufferPools HashSet<E> (id=424)
availableMemorySegments ArrayBlockingQueue<E> (id=427)
count 0
items Object[10240] (id=674)
itrs null
lock ReentrantLock (id=675)
notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678)
notFull AbstractQueuedSynchronizer$ConditionObject (id=679)
putIndex 6954
takeIndex 6954
factoryLock Object (id=430)
isDestroyed false
managedBufferPools HashSet<E> (id=431)
memorySegmentSize 32768
numTotalRequiredBuffers 3226
totalNumberOfMemorySegments 10240
numberOfRequestedMemorySegments 60
numberOfRequiredMemorySegments 32
owner null
registeredListeners ArrayDeque<E> (id=421)
elements Object[16] (id=685)
head 0
tail 0
askToRecycle false
isBlocking true
{{noformat}}
Second join stack trace and variable values from {{SingleInputGate.getNextBufferOrEvent}}:
{{noformat}}
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.parkNanos(Object, long) line: 215
AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078
LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467
SingleInputGate.getNextBufferOrEvent() line: 414
MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79
MutableRecordReader<T>.next(T) line: 34
ReaderIterator<T>.next(T) line: 59
MutableHashTable$ProbeIterator<PT>.next() line: 1581
MutableHashTable<BT,PT>.processProbeIter() line: 457
MutableHashTable<BT,PT>.nextRecord() line: 555
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 110
JoinDriver<IT1,IT2,OT>.run() line: 208
RegularPactTask<S,OT>.run() line: 489
RegularPactTask<S,OT>.invoke() line: 354
Task.run() line: 581
Thread.run() line: 745
{{noformat}}
{{noformat}}
this SingleInputGate (id=693)
bufferPool LocalBufferPool (id=706)
availableMemorySegments ArrayDeque<E> (id=716)
elements Object[16] (id=717)
head 0
tail 0
currentPoolSize 60
isDestroyed false
networkBufferPool NetworkBufferPool (id=354)
allBufferPools HashSet<E> (id=424)
availableMemorySegments ArrayBlockingQueue<E> (id=427)
count 0
items Object[10240] (id=674)
itrs null
lock ReentrantLock (id=675)
notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678)
notFull AbstractQueuedSynchronizer$ConditionObject (id=679)
putIndex 6954
takeIndex 6954
factoryLock Object (id=430)
isDestroyed false
managedBufferPools HashSet<E> (id=431)
memorySegmentSize 32768
numTotalRequiredBuffers 3226
totalNumberOfMemorySegments 10240
numberOfRequestedMemorySegments 0
numberOfRequiredMemorySegments 32
owner null
registeredListeners ArrayDeque<E> (id=718)
channelsWithEndOfPartitionEvents BitSet (id=707)
consumedResultId IntermediateDataSetID (id=708)
consumedSubpartitionIndex 24
executionId ExecutionAttemptID (id=709)
hasReceivedAllEndOfPartitionEvents false
inputChannels HashMap<K,V> (id=710)
inputChannelsWithData LinkedBlockingQueue<E> (id=692)
capacity 2147483647
count AtomicInteger (id=698)
value 0
head LinkedBlockingQueue$Node<E> (id=701)
last LinkedBlockingQueue$Node<E> (id=701)
notEmpty AbstractQueuedSynchronizer$ConditionObject (id=691)
notFull AbstractQueuedSynchronizer$ConditionObject (id=703)
putLock ReentrantLock (id=704)
takeLock ReentrantLock (id=705)
isReleased false
jobId JobID (id=711)
numberOfInputChannels 32
numberOfUninitializedChannels 0
owningTaskName "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" (id=712)
partitionStateChecker NetworkEnvironment$JobManagerPartitionStateChecker (id=363)
pendingEvents ArrayList<E> (id=713)
registeredListeners CopyOnWriteArrayList<E> (id=714)
requestedPartitionsFlag true
requestLock Object (id=715)
retriggerLocalRequestTimer null
currentChannel null
{{noformat}}
was:
This deadlock occurs intermittently. I have a {join} followed by a {chain<join,filter>} followed by a {reduceGroup}. Stack traces and local variables from one each of the {join} threads below.
The {join}s are waiting on a buffer to become available ({networkBufferPool.availableMemorySegments.count=0}). Both {LocalBufferPool}s have been given extra capacity ({currentPoolSize=60 > numberOfRequiredMemorySegments=32}). The first {join} is at full capacity ({currentPoolSize=numberOfRequestedMemorySegments=60}) yet the second {join} has not acquired any ({numberOfRequestedMemorySegments=0}).
{LocalBufferPool.returnExcessMemorySegments} only recycles {MemorySegment}s from its {availableMemorySegments}, so any requested {Buffer}s will only be released when explicitly recycled.
First join stack trace and variable values from {LocalBufferPool.requestBuffer}:
{noformat}
owns: SpanningRecordSerializer<T> (id=723)
waiting for: ArrayDeque<E> (id=724)
Object.wait(long) line: not available [native method]
LocalBufferPool.requestBuffer(boolean) line: 163
LocalBufferPool.requestBufferBlocking() line: 133
RecordWriter<T>.emit(T) line: 92
OutputCollector<T>.collect(T) line: 65
JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 1088
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 137
JoinDriver<IT1,IT2,OT>.run() line: 208
RegularPactTask<S,OT>.run() line: 489
RegularPactTask<S,OT>.invoke() line: 354
Task.run() line: 581
Thread.run() line: 745
{noformat}
{noformat}
this LocalBufferPool (id=403)
availableMemorySegments ArrayDeque<E> (id=398)
elements Object[16] (id=422)
head 14
tail 14
currentPoolSize 60
isDestroyed false
networkBufferPool NetworkBufferPool (id=354)
allBufferPools HashSet<E> (id=424)
availableMemorySegments ArrayBlockingQueue<E> (id=427)
count 0
items Object[10240] (id=674)
itrs null
lock ReentrantLock (id=675)
notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678)
notFull AbstractQueuedSynchronizer$ConditionObject (id=679)
putIndex 6954
takeIndex 6954
factoryLock Object (id=430)
isDestroyed false
managedBufferPools HashSet<E> (id=431)
memorySegmentSize 32768
numTotalRequiredBuffers 3226
totalNumberOfMemorySegments 10240
numberOfRequestedMemorySegments 60
numberOfRequiredMemorySegments 32
owner null
registeredListeners ArrayDeque<E> (id=421)
elements Object[16] (id=685)
head 0
tail 0
askToRecycle false
isBlocking true
{noformat}
Second join stack trace and variable values from {SingleInputGate.getNextBufferOrEvent}:
{noformat}
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.parkNanos(Object, long) line: 215
AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078
LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467
SingleInputGate.getNextBufferOrEvent() line: 414
MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79
MutableRecordReader<T>.next(T) line: 34
ReaderIterator<T>.next(T) line: 59
MutableHashTable$ProbeIterator<PT>.next() line: 1581
MutableHashTable<BT,PT>.processProbeIter() line: 457
MutableHashTable<BT,PT>.nextRecord() line: 555
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 110
JoinDriver<IT1,IT2,OT>.run() line: 208
RegularPactTask<S,OT>.run() line: 489
RegularPactTask<S,OT>.invoke() line: 354
Task.run() line: 581
Thread.run() line: 745
{noformat}
{noformat}
this SingleInputGate (id=693)
bufferPool LocalBufferPool (id=706)
availableMemorySegments ArrayDeque<E> (id=716)
elements Object[16] (id=717)
head 0
tail 0
currentPoolSize 60
isDestroyed false
networkBufferPool NetworkBufferPool (id=354)
allBufferPools HashSet<E> (id=424)
availableMemorySegments ArrayBlockingQueue<E> (id=427)
count 0
items Object[10240] (id=674)
itrs null
lock ReentrantLock (id=675)
notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678)
notFull AbstractQueuedSynchronizer$ConditionObject (id=679)
putIndex 6954
takeIndex 6954
factoryLock Object (id=430)
isDestroyed false
managedBufferPools HashSet<E> (id=431)
memorySegmentSize 32768
numTotalRequiredBuffers 3226
totalNumberOfMemorySegments 10240
numberOfRequestedMemorySegments 0
numberOfRequiredMemorySegments 32
owner null
registeredListeners ArrayDeque<E> (id=718)
channelsWithEndOfPartitionEvents BitSet (id=707)
consumedResultId IntermediateDataSetID (id=708)
consumedSubpartitionIndex 24
executionId ExecutionAttemptID (id=709)
hasReceivedAllEndOfPartitionEvents false
inputChannels HashMap<K,V> (id=710)
inputChannelsWithData LinkedBlockingQueue<E> (id=692)
capacity 2147483647
count AtomicInteger (id=698)
value 0
head LinkedBlockingQueue$Node<E> (id=701)
last LinkedBlockingQueue$Node<E> (id=701)
notEmpty AbstractQueuedSynchronizer$ConditionObject (id=691)
notFull AbstractQueuedSynchronizer$ConditionObject (id=703)
putLock ReentrantLock (id=704)
takeLock ReentrantLock (id=705)
isReleased false
jobId JobID (id=711)
numberOfInputChannels 32
numberOfUninitializedChannels 0
owningTaskName "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" (id=712)
partitionStateChecker NetworkEnvironment$JobManagerPartitionStateChecker (id=363)
pendingEvents ArrayList<E> (id=713)
registeredListeners CopyOnWriteArrayList<E> (id=714)
requestedPartitionsFlag true
requestLock Object (id=715)
retriggerLocalRequestTimer null
currentChannel null
{noformat}
> TaskManager deadlock on NetworkBufferPool
> -----------------------------------------
>
> Key: FLINK-2685
> URL: https://issues.apache.org/jira/browse/FLINK-2685
> Project: Flink
> Issue Type: Bug
> Components: Distributed Runtime
> Affects Versions: master
> Reporter: Greg Hogan
>
> This deadlock occurs intermittently. I have a {{join}} followed by a {{chain<join,filter>}} followed by a {{reduceGroup}}. Stack traces and local variables from one each of the {{join}} threads below.
> The {{join}}s are waiting on a buffer to become available ({{networkBufferPool.availableMemorySegments.count=0}}). Both {{LocalBufferPool}}s have been given extra capacity ({{currentPoolSize=60 > numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}).
> {{LocalBufferPool.returnExcessMemorySegments}} only recycles {{MemorySegment}}s from its {{availableMemorySegments}}, so any requested {{Buffer}}s will only be released when explicitly recycled.
> First join stack trace and variable values from {{LocalBufferPool.requestBuffer}}:
> {{noformat}}
> owns: SpanningRecordSerializer<T> (id=723)
> waiting for: ArrayDeque<E> (id=724)
> Object.wait(long) line: not available [native method]
> LocalBufferPool.requestBuffer(boolean) line: 163
> LocalBufferPool.requestBufferBlocking() line: 133
> RecordWriter<T>.emit(T) line: 92
> OutputCollector<T>.collect(T) line: 65
> JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 1088
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 137
> JoinDriver<IT1,IT2,OT>.run() line: 208
> RegularPactTask<S,OT>.run() line: 489
> RegularPactTask<S,OT>.invoke() line: 354
> Task.run() line: 581
> Thread.run() line: 745
> {{noformat}}
> {{noformat}}
> this LocalBufferPool (id=403)
> availableMemorySegments ArrayDeque<E> (id=398)
> elements Object[16] (id=422)
> head 14
> tail 14
> currentPoolSize 60
> isDestroyed false
> networkBufferPool NetworkBufferPool (id=354)
> allBufferPools HashSet<E> (id=424)
> availableMemorySegments ArrayBlockingQueue<E> (id=427)
> count 0
> items Object[10240] (id=674)
> itrs null
> lock ReentrantLock (id=675)
> notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678)
> notFull AbstractQueuedSynchronizer$ConditionObject (id=679)
> putIndex 6954
> takeIndex 6954
> factoryLock Object (id=430)
> isDestroyed false
> managedBufferPools HashSet<E> (id=431)
> memorySegmentSize 32768
> numTotalRequiredBuffers 3226
> totalNumberOfMemorySegments 10240
> numberOfRequestedMemorySegments 60
> numberOfRequiredMemorySegments 32
> owner null
> registeredListeners ArrayDeque<E> (id=421)
> elements Object[16] (id=685)
> head 0
> tail 0
> askToRecycle false
> isBlocking true
> {{noformat}}
> Second join stack trace and variable values from {{SingleInputGate.getNextBufferOrEvent}}:
> {{noformat}}
> Unsafe.park(boolean, long) line: not available [native method]
> LockSupport.parkNanos(Object, long) line: 215
> AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078
> LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467
> SingleInputGate.getNextBufferOrEvent() line: 414
> MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79
> MutableRecordReader<T>.next(T) line: 34
> ReaderIterator<T>.next(T) line: 59
> MutableHashTable$ProbeIterator<PT>.next() line: 1581
> MutableHashTable<BT,PT>.processProbeIter() line: 457
> MutableHashTable<BT,PT>.nextRecord() line: 555
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 110
> JoinDriver<IT1,IT2,OT>.run() line: 208
> RegularPactTask<S,OT>.run() line: 489
> RegularPactTask<S,OT>.invoke() line: 354
> Task.run() line: 581
> Thread.run() line: 745
> {{noformat}}
> {{noformat}}
> this SingleInputGate (id=693)
> bufferPool LocalBufferPool (id=706)
> availableMemorySegments ArrayDeque<E> (id=716)
> elements Object[16] (id=717)
> head 0
> tail 0
> currentPoolSize 60
> isDestroyed false
> networkBufferPool NetworkBufferPool (id=354)
> allBufferPools HashSet<E> (id=424)
> availableMemorySegments ArrayBlockingQueue<E> (id=427)
> count 0
> items Object[10240] (id=674)
> itrs null
> lock ReentrantLock (id=675)
> notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678)
> notFull AbstractQueuedSynchronizer$ConditionObject (id=679)
> putIndex 6954
> takeIndex 6954
> factoryLock Object (id=430)
> isDestroyed false
> managedBufferPools HashSet<E> (id=431)
> memorySegmentSize 32768
> numTotalRequiredBuffers 3226
> totalNumberOfMemorySegments 10240
> numberOfRequestedMemorySegments 0
> numberOfRequiredMemorySegments 32
> owner null
> registeredListeners ArrayDeque<E> (id=718)
> channelsWithEndOfPartitionEvents BitSet (id=707)
> consumedResultId IntermediateDataSetID (id=708)
> consumedSubpartitionIndex 24
> executionId ExecutionAttemptID (id=709)
> hasReceivedAllEndOfPartitionEvents false
> inputChannels HashMap<K,V> (id=710)
> inputChannelsWithData LinkedBlockingQueue<E> (id=692)
> capacity 2147483647
> count AtomicInteger (id=698)
> value 0
> head LinkedBlockingQueue$Node<E> (id=701)
> last LinkedBlockingQueue$Node<E> (id=701)
> notEmpty AbstractQueuedSynchronizer$ConditionObject (id=691)
> notFull AbstractQueuedSynchronizer$ConditionObject (id=703)
> putLock ReentrantLock (id=704)
> takeLock ReentrantLock (id=705)
> isReleased false
> jobId JobID (id=711)
> numberOfInputChannels 32
> numberOfUninitializedChannels 0
> owningTaskName "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" (id=712)
> partitionStateChecker NetworkEnvironment$JobManagerPartitionStateChecker (id=363)
> pendingEvents ArrayList<E> (id=713)
> registeredListeners CopyOnWriteArrayList<E> (id=714)
> requestedPartitionsFlag true
> requestLock Object (id=715)
> retriggerLocalRequestTimer null
> currentChannel null
> {{noformat}}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)