You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Amit Jain (JIRA)" <ji...@apache.org> on 2018/04/03 09:19:00 UTC

[jira] [Updated] (FLINK-2685) TaskManager deadlock on NetworkBufferPool

     [ https://issues.apache.org/jira/browse/FLINK-2685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Amit Jain updated FLINK-2685:
-----------------------------
    Attachment: job_manager_19_feb_15_30_running
                task_manager_19_feb_15_30_running

> TaskManager deadlock on NetworkBufferPool
> -----------------------------------------
>
>                 Key: FLINK-2685
>                 URL: https://issues.apache.org/jira/browse/FLINK-2685
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 0.10.0
>            Reporter: Greg Hogan
>            Assignee: Ufuk Celebi
>            Priority: Major
>         Attachments: job_manager_19_feb_15_30_running, task_manager_19_feb_15_30_running
>
>
> This deadlock occurs intermittently. I have a {{join}} followed by a {{chain<join,filter>}} followed by a {{reduceGroup}}. Stack traces and local variables from one each of the {{join}} threads below.
> The {{join}}'s are waiting on a buffer to become available ({{networkBufferPool.availableMemorySegments.count=0}}). Both {{LocalBufferPool}}'s have been given extra capacity ({{currentPoolSize=60 > numberOfRequiredMemorySegments=32}}). The first {{join}} is at full capacity ({{currentPoolSize=numberOfRequestedMemorySegments=60}}) yet the second {{join}} has not acquired any ({{numberOfRequestedMemorySegments=0}}).
> {{LocalBufferPool.returnExcessMemorySegments}} only recycles {{MemorySegment}}'s from its {{availableMemorySegments}}, so any requested {{Buffer}}'s will only be released when explicitly recycled.
> First join stack trace and variable values from {{LocalBufferPool.requestBuffer}}:
> {noformat}
> owns: SpanningRecordSerializer<T>  (id=723)	
> waiting for: ArrayDeque<E>  (id=724)	
> Object.wait(long) line: not available [native method]	
> LocalBufferPool.requestBuffer(boolean) line: 163	
> LocalBufferPool.requestBufferBlocking() line: 133	
> RecordWriter<T>.emit(T) line: 92	
> OutputCollector<T>.collect(T) line: 65	
> JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 1088	
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 137	
> JoinDriver<IT1,IT2,OT>.run() line: 208	
> RegularPactTask<S,OT>.run() line: 489	
> RegularPactTask<S,OT>.invoke() line: 354	
> Task.run() line: 581	
> Thread.run() line: 745	
> {noformat}
> {noformat}
> this	LocalBufferPool  (id=403)	
> 	availableMemorySegments	ArrayDeque<E>  (id=398)	
> 		elements	Object[16]  (id=422)	
> 		head	14	
> 		tail	14	
> 	currentPoolSize	60	
> 	isDestroyed	false	
> 	networkBufferPool	NetworkBufferPool  (id=354)	
> 		allBufferPools	HashSet<E>  (id=424)	
> 		availableMemorySegments	ArrayBlockingQueue<E>  (id=427)	
> 			count	0	
> 			items	Object[10240]  (id=674)	
> 			itrs	null	
> 			lock	ReentrantLock  (id=675)	
> 			notEmpty	AbstractQueuedSynchronizer$ConditionObject  (id=678)	
> 			notFull	AbstractQueuedSynchronizer$ConditionObject  (id=679)	
> 			putIndex	6954	
> 			takeIndex	6954	
> 		factoryLock	Object  (id=430)	
> 		isDestroyed	false	
> 		managedBufferPools	HashSet<E>  (id=431)	
> 		memorySegmentSize	32768	
> 		numTotalRequiredBuffers	3226	
> 		totalNumberOfMemorySegments	10240	
> 	numberOfRequestedMemorySegments	60	
> 	numberOfRequiredMemorySegments	32	
> 	owner	null	
> 	registeredListeners	ArrayDeque<E>  (id=421)	
> 		elements	Object[16]  (id=685)	
> 		head	0	
> 		tail	0	
> askToRecycle	false	
> isBlocking	true	
> {noformat}
> Second join stack trace and variable values from {{SingleInputGate.getNextBufferOrEvent}}:
> {noformat}
> Unsafe.park(boolean, long) line: not available [native method]	
> LockSupport.parkNanos(Object, long) line: 215	
> AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078	
> LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467	
> SingleInputGate.getNextBufferOrEvent() line: 414	
> MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79	
> MutableRecordReader<T>.next(T) line: 34	
> ReaderIterator<T>.next(T) line: 59	
> MutableHashTable$ProbeIterator<PT>.next() line: 1581	
> MutableHashTable<BT,PT>.processProbeIter() line: 457	
> MutableHashTable<BT,PT>.nextRecord() line: 555	
> ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 110	
> JoinDriver<IT1,IT2,OT>.run() line: 208	
> RegularPactTask<S,OT>.run() line: 489	
> RegularPactTask<S,OT>.invoke() line: 354	
> Task.run() line: 581	
> Thread.run() line: 745	
> {noformat}
> {noformat}
> this	SingleInputGate  (id=693)	
> 	bufferPool	LocalBufferPool  (id=706)	
> 		availableMemorySegments	ArrayDeque<E>  (id=716)	
> 			elements	Object[16]  (id=717)	
> 			head	0	
> 			tail	0	
> 		currentPoolSize	60	
> 		isDestroyed	false	
> 		networkBufferPool	NetworkBufferPool  (id=354)	
> 			allBufferPools	HashSet<E>  (id=424)	
> 			availableMemorySegments	ArrayBlockingQueue<E>  (id=427)	
> 				count	0	
> 				items	Object[10240]  (id=674)	
> 				itrs	null	
> 				lock	ReentrantLock  (id=675)	
> 				notEmpty	AbstractQueuedSynchronizer$ConditionObject  (id=678)	
> 				notFull	AbstractQueuedSynchronizer$ConditionObject  (id=679)	
> 				putIndex	6954	
> 				takeIndex	6954	
> 			factoryLock	Object  (id=430)	
> 			isDestroyed	false	
> 			managedBufferPools	HashSet<E>  (id=431)	
> 			memorySegmentSize	32768	
> 			numTotalRequiredBuffers	3226	
> 			totalNumberOfMemorySegments	10240	
> 		numberOfRequestedMemorySegments	0	
> 		numberOfRequiredMemorySegments	32	
> 		owner	null	
> 		registeredListeners	ArrayDeque<E>  (id=718)	
> 	channelsWithEndOfPartitionEvents	BitSet  (id=707)	
> 	consumedResultId	IntermediateDataSetID  (id=708)	
> 	consumedSubpartitionIndex	24	
> 	executionId	ExecutionAttemptID  (id=709)	
> 	hasReceivedAllEndOfPartitionEvents	false	
> 	inputChannels	HashMap<K,V>  (id=710)	
> 	inputChannelsWithData	LinkedBlockingQueue<E>  (id=692)	
> 		capacity	2147483647	
> 		count	AtomicInteger  (id=698)	
> 			value	0	
> 		head	LinkedBlockingQueue$Node<E>  (id=701)	
> 		last	LinkedBlockingQueue$Node<E>  (id=701)	
> 		notEmpty	AbstractQueuedSynchronizer$ConditionObject  (id=691)	
> 		notFull	AbstractQueuedSynchronizer$ConditionObject  (id=703)	
> 		putLock	ReentrantLock  (id=704)	
> 		takeLock	ReentrantLock  (id=705)	
> 	isReleased	false	
> 	jobId	JobID  (id=711)	
> 	numberOfInputChannels	32	
> 	numberOfUninitializedChannels	0	
> 	owningTaskName	"Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" (id=712)	
> 	partitionStateChecker	NetworkEnvironment$JobManagerPartitionStateChecker  (id=363)	
> 	pendingEvents	ArrayList<E>  (id=713)	
> 	registeredListeners	CopyOnWriteArrayList<E>  (id=714)	
> 	requestedPartitionsFlag	true	
> 	requestLock	Object  (id=715)	
> 	retriggerLocalRequestTimer	null	
> currentChannel	null	
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)