You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Emil Ejbyfeldt <ee...@liveintent.com.INVALID> on 2022/02/02 11:58:50 UTC

MetadataFetchFailedException due to decommission block migrations

As noted in SPARK-34939 there is race when using broadcast for map 
output status. Explanation from SPARK-34939

 > After map statuses are broadcasted and the executors obtain 
serialized broadcasted map statuses. If any fetch failure happens after, 
Spark scheduler invalidates cached map statuses and destroy broadcasted 
value of the map statuses. Then any executor trying to deserialize 
serialized broadcasted map statuses and access broadcasted value, 
IOException will be thrown. Currently we don't catch it in 
MapOutputTrackerWorker and above exception will fail the application.

But if running with `spark.decommission.enabled=true` and 
`spark.storage.decommission.shuffleBlocks.enabled=true` there is another 
way to hit this race, when a node is decommissioning and the shuffle 
blocks are migrated. After a block has been migrated an update will be 
sent to the driver for each block and the map output caches will be 
invalidated.

Here are a driver when we hit the race condition running with spark 3.2.0:

2022-01-28 03:20:12,409 INFO memory.MemoryStore: Block broadcast_27 
stored as values in memory (estimated size 5.5 MiB, free 11.0 GiB)
2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output 
for 192108 to BlockManagerId(760, ip-10-231-63-204.ec2.internal, 34707, 
None)
2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output 
for 179529 to BlockManagerId(743, ip-10-231-34-160.ec2.internal, 44225, 
None)
2022-01-28 03:20:12,414 INFO spark.ShuffleStatus: Updating map output 
for 187194 to BlockManagerId(761, ip-10-231-43-219.ec2.internal, 39943, 
None)
2022-01-28 03:20:12,415 INFO spark.ShuffleStatus: Updating map output 
for 190303 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965, 
None)
2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output 
for 192220 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965, 
None)
2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output 
for 182306 to BlockManagerId(688, ip-10-231-43-41.ec2.internal, 35967, None)
2022-01-28 03:20:12,417 INFO spark.ShuffleStatus: Updating map output 
for 190387 to BlockManagerId(772, ip-10-231-55-173.ec2.internal, 35523, 
None)
2022-01-28 03:20:12,417 INFO memory.MemoryStore: Block 
broadcast_27_piece0 stored as bytes in memory (estimated size 4.0 MiB, 
free 10.9 GiB)
2022-01-28 03:20:12,417 INFO storage.BlockManagerInfo: Added 
broadcast_27_piece0 in memory on ip-10-231-63-1.ec2.internal:34761 
(size: 4.0 MiB, free: 11.0 GiB)
2022-01-28 03:20:12,418 INFO memory.MemoryStore: Block 
broadcast_27_piece1 stored as bytes in memory (estimated size 1520.4 
KiB, free 10.9 GiB)
2022-01-28 03:20:12,418 INFO storage.BlockManagerInfo: Added 
broadcast_27_piece1 in memory on ip-10-231-63-1.ec2.internal:34761 
(size: 1520.4 KiB, free: 11.0 GiB)
2022-01-28 03:20:12,418 INFO spark.MapOutputTracker: Broadcast 
outputstatuses size = 416, actual size = 5747443
2022-01-28 03:20:12,419 INFO spark.ShuffleStatus: Updating map output 
for 153389 to BlockManagerId(154, ip-10-231-42-104.ec2.internal, 44717, 
None)
2022-01-28 03:20:12,419 INFO broadcast.TorrentBroadcast: Destroying 
Broadcast(27) (from updateMapOutput at BlockManagerMasterEndpoint.scala:594)
2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Added 
rdd_65_20310 on disk on ip-10-231-32-25.ec2.internal:40657 (size: 77.6 MiB)
2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Removed 
broadcast_27_piece0 on ip-10-231-63-1.ec2.internal:34761 in memory 
(size: 4.0 MiB, free: 11.0 GiB)

While the Broadcast is being constructed we have updates coming in and 
the broadcast is destroyed almost immediately. On this particular job we 
ended up hitting the race condition a lot of times and it caused ~18 
task failures and stage retries within 20 seconds causing us to hit our 
stage retry limit and the job to fail.

As far I understand this was the expected behavior for handling this 
case after SPARK-34939. But it seems like when combined with 
decommissioning hitting the race is a bit too common.

Anyone else running it something similar?

I willing to help up to develop a fix, but might need some guidance of 
how this case could be handled better.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: MetadataFetchFailedException due to decommission block migrations

Posted by Dongjoon Hyun <do...@gmail.com>.
Thank you for sharing, Emil.

> I willing to help up to develop a fix, but might need some guidance of
> how this case could be handled better.

Could you file an official Apache JIRA for your finding and
propose a PR for that too with the test case? We can continue
our discussion on your PR.

Dongjoon.



On Wed, Feb 2, 2022 at 3:59 AM Emil Ejbyfeldt
<ee...@liveintent.com.invalid> wrote:

> As noted in SPARK-34939 there is race when using broadcast for map
> output status. Explanation from SPARK-34939
>
>  > After map statuses are broadcasted and the executors obtain
> serialized broadcasted map statuses. If any fetch failure happens after,
> Spark scheduler invalidates cached map statuses and destroy broadcasted
> value of the map statuses. Then any executor trying to deserialize
> serialized broadcasted map statuses and access broadcasted value,
> IOException will be thrown. Currently we don't catch it in
> MapOutputTrackerWorker and above exception will fail the application.
>
> But if running with `spark.decommission.enabled=true` and
> `spark.storage.decommission.shuffleBlocks.enabled=true` there is another
> way to hit this race, when a node is decommissioning and the shuffle
> blocks are migrated. After a block has been migrated an update will be
> sent to the driver for each block and the map output caches will be
> invalidated.
>
> Here are a driver when we hit the race condition running with spark 3.2.0:
>
> 2022-01-28 03:20:12,409 INFO memory.MemoryStore: Block broadcast_27
> stored as values in memory (estimated size 5.5 MiB, free 11.0 GiB)
> 2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output
> for 192108 to BlockManagerId(760, ip-10-231-63-204.ec2.internal, 34707,
> None)
> 2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output
> for 179529 to BlockManagerId(743, ip-10-231-34-160.ec2.internal, 44225,
> None)
> 2022-01-28 03:20:12,414 INFO spark.ShuffleStatus: Updating map output
> for 187194 to BlockManagerId(761, ip-10-231-43-219.ec2.internal, 39943,
> None)
> 2022-01-28 03:20:12,415 INFO spark.ShuffleStatus: Updating map output
> for 190303 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965,
> None)
> 2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output
> for 192220 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965,
> None)
> 2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output
> for 182306 to BlockManagerId(688, ip-10-231-43-41.ec2.internal, 35967,
> None)
> 2022-01-28 03:20:12,417 INFO spark.ShuffleStatus: Updating map output
> for 190387 to BlockManagerId(772, ip-10-231-55-173.ec2.internal, 35523,
> None)
> 2022-01-28 03:20:12,417 INFO memory.MemoryStore: Block
> broadcast_27_piece0 stored as bytes in memory (estimated size 4.0 MiB,
> free 10.9 GiB)
> 2022-01-28 03:20:12,417 INFO storage.BlockManagerInfo: Added
> broadcast_27_piece0 in memory on ip-10-231-63-1.ec2.internal:34761
> (size: 4.0 MiB, free: 11.0 GiB)
> 2022-01-28 03:20:12,418 INFO memory.MemoryStore: Block
> broadcast_27_piece1 stored as bytes in memory (estimated size 1520.4
> KiB, free 10.9 GiB)
> 2022-01-28 03:20:12,418 INFO storage.BlockManagerInfo: Added
> broadcast_27_piece1 in memory on ip-10-231-63-1.ec2.internal:34761
> (size: 1520.4 KiB, free: 11.0 GiB)
> 2022-01-28 03:20:12,418 INFO spark.MapOutputTracker: Broadcast
> outputstatuses size = 416, actual size = 5747443
> 2022-01-28 03:20:12,419 INFO spark.ShuffleStatus: Updating map output
> for 153389 to BlockManagerId(154, ip-10-231-42-104.ec2.internal, 44717,
> None)
> 2022-01-28 03:20:12,419 INFO broadcast.TorrentBroadcast: Destroying
> Broadcast(27) (from updateMapOutput at
> BlockManagerMasterEndpoint.scala:594)
> 2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Added
> rdd_65_20310 on disk on ip-10-231-32-25.ec2.internal:40657 (size: 77.6 MiB)
> 2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Removed
> broadcast_27_piece0 on ip-10-231-63-1.ec2.internal:34761 in memory
> (size: 4.0 MiB, free: 11.0 GiB)
>
> While the Broadcast is being constructed we have updates coming in and
> the broadcast is destroyed almost immediately. On this particular job we
> ended up hitting the race condition a lot of times and it caused ~18
> task failures and stage retries within 20 seconds causing us to hit our
> stage retry limit and the job to fail.
>
> As far I understand this was the expected behavior for handling this
> case after SPARK-34939. But it seems like when combined with
> decommissioning hitting the race is a bit too common.
>
> Anyone else running it something similar?
>
> I willing to help up to develop a fix, but might need some guidance of
> how this case could be handled better.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>
>