You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Greg Hogan (JIRA)" <ji...@apache.org> on 2015/09/15 21:41:46 UTC

[jira] [Created] (FLINK-2685) TaskManager deadlock on NetworkBufferPool

Greg Hogan created FLINK-2685:
---------------------------------

             Summary: TaskManager deadlock on NetworkBufferPool
                 Key: FLINK-2685
                 URL: https://issues.apache.org/jira/browse/FLINK-2685
             Project: Flink
          Issue Type: Bug
          Components: Distributed Runtime
    Affects Versions: master
            Reporter: Greg Hogan


This deadlock occurs intermittently. I have a {join} followed by a {chain<join,filter>} followed by a {reduceGroup}. Stack traces and local variables from one each of the {join} threads below.

The {join}s are waiting on a buffer to become available ({networkBufferPool.availableMemorySegments.count=0}). Both {LocalBufferPool}s have been given extra capacity ({currentPoolSize=60 > numberOfRequiredMemorySegments=32}). The first {join} is at full capacity ({currentPoolSize=numberOfRequestedMemorySegments=60}) yet the second {join} has not acquired any ({numberOfRequestedMemorySegments=0}).

{LocalBufferPool.returnExcessMemorySegments} only recycles {MemorySegment}s from its {availableMemorySegments}, so any requested {Buffer}s will only be released when explicitly recycled.

First join stack trace and variable values from {LocalBufferPool.requestBuffer}:
{noformat}
owns: SpanningRecordSerializer<T>  (id=723)	
waiting for: ArrayDeque<E>  (id=724)	
Object.wait(long) line: not available [native method]	
LocalBufferPool.requestBuffer(boolean) line: 163	
LocalBufferPool.requestBufferBlocking() line: 133	
RecordWriter<T>.emit(T) line: 92	
OutputCollector<T>.collect(T) line: 65	
JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 1088	
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 137	
JoinDriver<IT1,IT2,OT>.run() line: 208	
RegularPactTask<S,OT>.run() line: 489	
RegularPactTask<S,OT>.invoke() line: 354	
Task.run() line: 581	
Thread.run() line: 745	
{noformat}

{noformat}
this	LocalBufferPool  (id=403)	
	availableMemorySegments	ArrayDeque<E>  (id=398)	
		elements	Object[16]  (id=422)	
		head	14	
		tail	14	
	currentPoolSize	60	
	isDestroyed	false	
	networkBufferPool	NetworkBufferPool  (id=354)	
		allBufferPools	HashSet<E>  (id=424)	
		availableMemorySegments	ArrayBlockingQueue<E>  (id=427)	
			count	0	
			items	Object[10240]  (id=674)	
			itrs	null	
			lock	ReentrantLock  (id=675)	
			notEmpty	AbstractQueuedSynchronizer$ConditionObject  (id=678)	
			notFull	AbstractQueuedSynchronizer$ConditionObject  (id=679)	
			putIndex	6954	
			takeIndex	6954	
		factoryLock	Object  (id=430)	
		isDestroyed	false	
		managedBufferPools	HashSet<E>  (id=431)	
		memorySegmentSize	32768	
		numTotalRequiredBuffers	3226	
		totalNumberOfMemorySegments	10240	
	numberOfRequestedMemorySegments	60	
	numberOfRequiredMemorySegments	32	
	owner	null	
	registeredListeners	ArrayDeque<E>  (id=421)	
		elements	Object[16]  (id=685)	
		head	0	
		tail	0	
askToRecycle	false	
isBlocking	true	
{noformat}

Second join stack trace and variable values from {SingleInputGate.getNextBufferOrEvent}:
{noformat}
Unsafe.park(boolean, long) line: not available [native method]	
LockSupport.parkNanos(Object, long) line: 215	
AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078	
LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467	
SingleInputGate.getNextBufferOrEvent() line: 414	
MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79	
MutableRecordReader<T>.next(T) line: 34	
ReaderIterator<T>.next(T) line: 59	
MutableHashTable$ProbeIterator<PT>.next() line: 1581	
MutableHashTable<BT,PT>.processProbeIter() line: 457	
MutableHashTable<BT,PT>.nextRecord() line: 555	
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 110	
JoinDriver<IT1,IT2,OT>.run() line: 208	
RegularPactTask<S,OT>.run() line: 489	
RegularPactTask<S,OT>.invoke() line: 354	
Task.run() line: 581	
Thread.run() line: 745	
{noformat}

{noformat}
this	SingleInputGate  (id=693)	
	bufferPool	LocalBufferPool  (id=706)	
		availableMemorySegments	ArrayDeque<E>  (id=716)	
			elements	Object[16]  (id=717)	
			head	0	
			tail	0	
		currentPoolSize	60	
		isDestroyed	false	
		networkBufferPool	NetworkBufferPool  (id=354)	
			allBufferPools	HashSet<E>  (id=424)	
			availableMemorySegments	ArrayBlockingQueue<E>  (id=427)	
				count	0	
				items	Object[10240]  (id=674)	
				itrs	null	
				lock	ReentrantLock  (id=675)	
				notEmpty	AbstractQueuedSynchronizer$ConditionObject  (id=678)	
				notFull	AbstractQueuedSynchronizer$ConditionObject  (id=679)	
				putIndex	6954	
				takeIndex	6954	
			factoryLock	Object  (id=430)	
			isDestroyed	false	
			managedBufferPools	HashSet<E>  (id=431)	
			memorySegmentSize	32768	
			numTotalRequiredBuffers	3226	
			totalNumberOfMemorySegments	10240	
		numberOfRequestedMemorySegments	0	
		numberOfRequiredMemorySegments	32	
		owner	null	
		registeredListeners	ArrayDeque<E>  (id=718)	
	channelsWithEndOfPartitionEvents	BitSet  (id=707)	
	consumedResultId	IntermediateDataSetID  (id=708)	
	consumedSubpartitionIndex	24	
	executionId	ExecutionAttemptID  (id=709)	
	hasReceivedAllEndOfPartitionEvents	false	
	inputChannels	HashMap<K,V>  (id=710)	
	inputChannelsWithData	LinkedBlockingQueue<E>  (id=692)	
		capacity	2147483647	
		count	AtomicInteger  (id=698)	
			value	0	
		head	LinkedBlockingQueue$Node<E>  (id=701)	
		last	LinkedBlockingQueue$Node<E>  (id=701)	
		notEmpty	AbstractQueuedSynchronizer$ConditionObject  (id=691)	
		notFull	AbstractQueuedSynchronizer$ConditionObject  (id=703)	
		putLock	ReentrantLock  (id=704)	
		takeLock	ReentrantLock  (id=705)	
	isReleased	false	
	jobId	JobID  (id=711)	
	numberOfInputChannels	32	
	numberOfUninitializedChannels	0	
	owningTaskName	"Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" (id=712)	
	partitionStateChecker	NetworkEnvironment$JobManagerPartitionStateChecker  (id=363)	
	pendingEvents	ArrayList<E>  (id=713)	
	registeredListeners	CopyOnWriteArrayList<E>  (id=714)	
	requestedPartitionsFlag	true	
	requestLock	Object  (id=715)	
	retriggerLocalRequestTimer	null	
currentChannel	null	
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)