You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "liupengcheng (Jira)" <ji...@apache.org> on 2020/02/17 09:37:00 UTC
[jira] [Created] (SPARK-30849) Application failed due to failed to
get MapStatuses broadcast
liupengcheng created SPARK-30849:
------------------------------------
Summary: Application failed due to failed to get MapStatuses broadcast
Key: SPARK-30849
URL: https://issues.apache.org/jira/browse/SPARK-30849
Project: Spark
Issue Type: Improvement
Components: Spark Core
Affects Versions: 2.1.0
Reporter: liupengcheng
Attachments: image-2020-02-16-11-13-18-195.png, image-2020-02-16-11-17-32-103.png
Currently, we encountered a issue in Spark2.1. The exception is as follows:
{noformat}
Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, most recent failure: Lost task 18.3 in stage 2.0 (TID 13819, xxxx , executor 8): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_9_piece1 of broadcast_9
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
{noformat}
I looked into the code and the logs, it seems that it's caused by the mapStatuses broadcast id is sent to executor, but was invalidated immediately by the driver before the real fetching of the broadcast.
This can be described as follows:
Let's say we have an rdd1,
rdd2 = rdd1.repartition(100) // stage 0
rdd3 = rdd2.map(xxx) // stage 1
rdd4 = rdd2.map(xxx) // stage 2
// and then do some join and output result
rdd3.join(rdd4).save
When FetchFailedException happened in stage 1, then stage 0 and stage 1 will be resubmitted and re-executed, but stage 2 is still running, it's task will fetch mapStatuses from driver, but the mapStatuses cache will be invalidated when tasks of stage 0.1 completes and registerMapOutput.
I checked the master branch, seems that we are fixed correctness issues on `repartition`, but I think this issue may still exist?
Some ScreenShot:
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org