You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:45:08 UTC

[jira] [Resolved] (SPARK-22823) Race Condition when reading Broadcast shuffle input. Failed to get broadcast piece

     [ https://issues.apache.org/jira/browse/SPARK-22823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-22823.
----------------------------------
    Resolution: Incomplete

> Race Condition when reading Broadcast shuffle input. Failed to get broadcast piece
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-22823
>                 URL: https://issues.apache.org/jira/browse/SPARK-22823
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Spark Core
>    Affects Versions: 2.0.1, 2.2.1, 2.3.0
>            Reporter: Dmitrii Bundin
>            Priority: Major
>              Labels: bulk-closed
>
> It seems we have a race condition when trying to read shuffle input which is a broadcast, not direct. To read broadcast MapStatuses at 
> {code:java}
> org.apache.spark.shuffle.BlockStoreShuffleReader::read()
> {code}
> we submit a message of the type GetMapOutputStatuses(shuffleId) to be executed in MapOutputTrackerMaster's pool which in turn ends up in creating a new broadcast at
> {code:java}
> org.apache.spark.MapOutputTracker::serializeMapStatuses
> {code}
> if the received statuses bytes more than minBroadcastSize.
> So registering the newly created broadcast in the driver's BlockManagerMasterEndpoint may appear later than some executor asks for the broadcast piece location from the driver.
> In out project we get the following exception on the regular basis:
> {code:java}
> java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_176_piece0 of broadcast_176
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1280)
>         at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
>         at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
>         at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
>         at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
>         at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>         at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
>         at org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$1.apply(MapOutputTracker.scala:661)
>         at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
>         at org.apache.spark.MapOutputTracker$.logInfo(MapOutputTracker.scala:598)
>         at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:660)
>         at org.apache.spark.MapOutputTracker.getStatuses(MapOutputTracker.scala:203)
>         at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:142)
>         at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
>         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>         at org.apache.spark.scheduler.Task.run(Task.scala:86)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get broadcast_176_piece0 of broadcast_176
>         at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:146)
>         at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
>         at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:125)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:125)
>         at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:186)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1273)
> {code}
> This exception is appeared when we try to read a broadcast piece. To do this we need to fetch the broadcast piece location from the driver 
> {code:java}
> org.apache.spark.storage.BlockManagerMaster::getLocations(blockId: BlockId)
> {code}
> . The driver responses with empty list of locations and the fecthing of broadcast piece failed with the exception listed above. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org