You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "liupengcheng (Jira)" <ji...@apache.org> on 2020/02/17 09:37:00 UTC

[jira] [Created] (SPARK-30849) Application failed due to failed to get MapStatuses broadcast

liupengcheng created SPARK-30849:
------------------------------------

             Summary: Application failed due to failed to get MapStatuses broadcast
                 Key: SPARK-30849
                 URL: https://issues.apache.org/jira/browse/SPARK-30849
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 2.1.0
            Reporter: liupengcheng
         Attachments: image-2020-02-16-11-13-18-195.png, image-2020-02-16-11-17-32-103.png

Currently, we encountered a issue in Spark2.1. The exception is as follows:


{noformat}
	Job aborted due to stage failure: Task 18 in stage 2.0 failed 4 times, most recent failure: Lost task 18.3 in stage 2.0 (TID 13819, xxxx , executor 8): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_9_piece1 of broadcast_9
java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_9_piece1 of broadcast_9
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1287)
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
	at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
	at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:775)
	at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
	at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:712)
	at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:774)
	at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:665)
	at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:603)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:57)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
{noformat}

I looked into the code and the logs, it seems that it's caused by the mapStatuses broadcast id is sent to executor, but was invalidated immediately by the driver before the real fetching of the broadcast.

This can be described as follows:
Let's say we have an rdd1,
rdd2 = rdd1.repartition(100) // stage 0
rdd3 = rdd2.map(xxx)  // stage 1
rdd4 = rdd2.map(xxx)  // stage 2
// and then do some join and output result
rdd3.join(rdd4).save

When FetchFailedException happened in stage 1, then stage 0 and stage 1 will be resubmitted and re-executed, but stage 2 is still running, it's task will fetch mapStatuses from driver, but the mapStatuses cache will be invalidated when tasks of stage 0.1 completes and registerMapOutput.

I checked the master branch, seems that we are fixed correctness issues on `repartition`, but I think this issue may still exist? 

Some ScreenShot:










--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org