You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:43:15 UTC
[jira] [Resolved] (SPARK-25070)
BlockFetchingListener#onBlockFetchSuccess throw
"java.util.NoSuchElementException: key not found: shuffle_8_68_113" on
ShuffleBlockFetcherIterator caused stage hang long time
[ https://issues.apache.org/jira/browse/SPARK-25070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-25070.
----------------------------------
Resolution: Incomplete
> BlockFetchingListener#onBlockFetchSuccess throw "java.util.NoSuchElementException: key not found: shuffle_8_68_113" on ShuffleBlockFetcherIterator caused stage hang long time
> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-25070
> URL: https://issues.apache.org/jira/browse/SPARK-25070
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.2.0
> Reporter: DENG FEI
> Priority: Major
> Labels: bulk-closed
>
> The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the task hang long time and speculate as false.
> The log is below:
> 18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches in 16 ms
> 18/08/08 14:55:53 WARN TransportChannelHandler: Exception in connection from /xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not found: shuffle_8_68_113 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:59) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217) at org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171) at
> XXXXXX
> 18/08/08 14:55:53 INFO Executor: Finished task 44.0 in stage 14.0 (TID 1483). 3458 bytes result sent to driver 18/08/09 10:02:32 INFO Executor: Executor is trying to kill task 113.0 in stage 14.0 (TID 1552), reason: stage cancelled
> {code:java}
> val blockFetchingListener = new BlockFetchingListener {
> override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
> // Only add the buffer to results queue if the iterator is not zombie,
> // i.e. cleanup() has not been called yet.
> ShuffleBlockFetcherIterator.this.synchronized {
> try {
> if (!isZombie) {
> // Increment the ref count because we need to pass this to a different thread.
> // This needs to be released after use.
> buf.retain()
> remainingBlocks -= blockId
> results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf,
> remainingBlocks.isEmpty))
> logDebug("remainingBlocks: " + remainingBlocks)
> }
> } catch {
> case e : Throwable => onBlockFetchFailure(blockId, e)
> }
> }
> logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
> }
> override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
> logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
> results.put(new FailureFetchResult(BlockId(blockId), address, e))
> }
> }
> {code}
>
>
> {code:java}
> results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf,
> remainingBlocks.isEmpty)){code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org