You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ashish Rawat <As...@guavus.com> on 2015/03/24 19:41:08 UTC

Spark Application Hung

Hi,

We are observing a hung spark application when one of the yarn datanode (running multiple spark executors) go down.

Setup details:

  *   Spark: 1.2.1
  *   Hadoop: 2.4.0
  *   Spark Application Mode: yarn-client
  *   2 datanodes (DN1, DN2)
  *   6 spark executors (initially 3 executors on both DN1 and DN2, after rebooting DN2, changes to 4 executors on DN1 and 2 executors on DN2)

Scenario:

When one of the datanodes (DN2) is brought down, the application gets hung, with spark driver continuously showing the following warning:

15/03/24 12:39:26 WARN TaskSetManager: Lost task 5.0 in stage 232.0 (TID 37941, DN1): FetchFailed(null, shuffleId=155, mapId=-1, reduceId=5, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 155
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
        at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)


When DN2 is brought down, one executor gets launched on DN1. When DN2 is brought back up after 15mins, 2 executors get launched on it.
All the executors (including the ones which got launched after DN2 comes back), keep showing the following errors:

15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 155, fetching them
15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 155, fetching them
15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker actor = Actor[akka.tcp://sparkDriver@NN1:44353/user/MapOutputTracker#-957394722]
15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Got the output locations
15/03/24 12:43:30 ERROR spark.MapOutputTracker: Missing an output location for shuffle 155
15/03/24 12:43:30 ERROR spark.MapOutputTracker: Missing an output location for shuffle 155
15/03/24 12:43:30 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 44623
15/03/24 12:43:30 INFO executor.Executor: Running task 5.0 in stage 232.960 (TID 44623)
15/03/24 12:43:30 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 44629
15/03/24 12:43:30 INFO executor.Executor: Running task 11.0 in stage 232.960 (TID 44629)
15/03/24 12:43:30 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 2275
15/03/24 12:43:30 INFO storage.MemoryStore: ensureFreeSpace(16308) called with curMem=44996, maxMem=5556708311
15/03/24 12:43:30 INFO storage.MemoryStore: Block broadcast_2275_piece0 stored as bytes in memory (estimated size 15.9 KB, free 5.2 GB)
15/03/24 12:43:30 INFO storage.BlockManagerMaster: Updated info of block broadcast_2275_piece0
15/03/24 12:43:30 INFO broadcast.TorrentBroadcast: Reading broadcast variable 2275 took 97 ms
15/03/24 12:43:30 INFO storage.MemoryStore: ensureFreeSpace(28688) called with curMem=61304, maxMem=5556708311
15/03/24 12:43:30 INFO storage.MemoryStore: Block broadcast_2275 stored as values in memory (estimated size 28.0 KB, free 5.2 GB)
15/03/24 12:43:30 ERROR spark.MapOutputTracker: Missing an output location for shuffle 155
15/03/24 12:43:30 ERROR spark.MapOutputTracker: Missing an output location for shuffle 155

The newly launched executors have almost 0 memory utilisation and are stuck with the above errors.

The driver has the following logs, just before the "Lost Task" messages begin to appear.

15/03/24 12:39:26 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 155 to sparkExecutor@DN1:35682
15/03/24 12:39:26 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 155 is 527 bytes
15/03/24 12:39:26 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 155 to sparkExecutor@DN1:34062
15/03/24 12:39:26 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 155 to sparkExecutor@DN1:45639

Before the application gets hung on shuffle id 155, there are similar warnings on other shuffle ids which get resolved in 2-3 attempts, but it never gets resolved for shuffle 155. Is it because the mapStatuses in MapOutputTrackerMaster in Driver has a corrupt state for shuffle 155? If yes, then how can the spark application reach such a state and aren't there suitable steps to recover?

Can someone please help in debugging this issue. We haven't yet restarted the system as this error may not be easily reproducible but resolving such issues is critical to take our application to production. Would appreciate some quick help!

Regards,
Ashish



Re: Spark Application Hung

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
In production, i'd suggest you having a High availability cluster with
minimum of 3 nodes (data nodes in your case).

Now lets examine your scenario:

- When you suddenly brings down one of the node which has 2 executors
running on it, what happens is that the node (DN2) will be having your jobs
shuffle data or computed data stored in it for the next stages (this is the
same effect as deleting your spark's local/work dir from DN1). The absence
of this node will lead to fetchFailures as you are seeing in the logs. But
eventually it will end up trying for sometime and i believe it will
recompute your whole pipeline on DN1



Thanks
Best Regards

On Wed, Mar 25, 2015 at 12:11 AM, Ashish Rawat <As...@guavus.com>
wrote:

>  Hi,
>
>  We are observing a hung spark application when one of the yarn datanode
> (running multiple spark executors) go down.
>
>  *Setup details*:
>
>    - Spark: 1.2.1
>    - Hadoop: 2.4.0
>    - Spark Application Mode: yarn-client
>    - 2 datanodes (DN1, DN2)
>    - 6 spark executors (initially 3 executors on both DN1 and DN2, after
>    rebooting DN2, changes to 4 executors on DN1 and 2 executors on DN2)
>
> *Scenario*:
>
>  When one of the datanodes (DN2) is brought down, the application gets
> hung, with spark driver continuously showing the following warning:
>
>  15/03/24 12:39:26 WARN TaskSetManager: Lost task 5.0 in stage 232.0 (TID
> 37941, DN1): FetchFailed(null, shuffleId=155, mapId=-1, reduceId=5, message=
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 155
>         at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
>         at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>         at
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
>         at
> org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
>         at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>         at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>         at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>         at java.lang.Thread.run(Unknown Source)
>
>
>  When DN2 is brought down, one executor gets launched on DN1. When DN2 is
> brought back up after 15mins, 2 executors get launched on it.
> All the executors (including the ones which got launched after DN2 comes
> back), keep showing the following errors:
>
>  15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Don't have map
> outputs for shuffle 155, fetching them
> 15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Don't have map
> outputs for shuffle 155, fetching them
> 15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Doing the fetch;
> tracker actor = Actor[akka.tcp://sparkDriver@NN1
> :44353/user/MapOutputTracker#-957394722]
> 15/03/24 12:43:30 INFO spark.MapOutputTrackerWorker: Got the output
> locations
> 15/03/24 12:43:30 ERROR spark.MapOutputTracker: Missing an output location
> for shuffle 155
> 15/03/24 12:43:30 ERROR spark.MapOutputTracker: Missing an output location
> for shuffle 155
> 15/03/24 12:43:30 INFO executor.CoarseGrainedExecutorBackend: Got assigned
> task 44623
> 15/03/24 12:43:30 INFO executor.Executor: Running task 5.0 in stage
> 232.960 (TID 44623)
> 15/03/24 12:43:30 INFO executor.CoarseGrainedExecutorBackend: Got assigned
> task 44629
> 15/03/24 12:43:30 INFO executor.Executor: Running task 11.0 in stage
> 232.960 (TID 44629)
> 15/03/24 12:43:30 INFO broadcast.TorrentBroadcast: Started reading
> broadcast variable 2275
> 15/03/24 12:43:30 INFO storage.MemoryStore: ensureFreeSpace(16308) called
> with curMem=44996, maxMem=5556708311
> 15/03/24 12:43:30 INFO storage.MemoryStore: Block broadcast_2275_piece0
> stored as bytes in memory (estimated size 15.9 KB, free 5.2 GB)
> 15/03/24 12:43:30 INFO storage.BlockManagerMaster: Updated info of block
> broadcast_2275_piece0
> 15/03/24 12:43:30 INFO broadcast.TorrentBroadcast: Reading broadcast
> variable 2275 took 97 ms
> 15/03/24 12:43:30 INFO storage.MemoryStore: ensureFreeSpace(28688) called
> with curMem=61304, maxMem=5556708311
> 15/03/24 12:43:30 INFO storage.MemoryStore: Block broadcast_2275 stored as
> values in memory (estimated size 28.0 KB, free 5.2 GB)
> 15/03/24 12:43:30 ERROR spark.MapOutputTracker: Missing an output location
> for shuffle 155
> 15/03/24 12:43:30 ERROR spark.MapOutputTracker: Missing an output location
> for shuffle 155
>
>  The newly launched executors have almost 0 memory utilisation and are
> stuck with the above errors.
>
>  The driver has the following logs, just before the "Lost Task" messages
> begin to appear.
>
>  15/03/24 12:39:26 INFO MapOutputTrackerMasterActor: Asked to send map
> output locations for shuffle 155 to sparkExecutor@DN1:35682
> 15/03/24 12:39:26 INFO MapOutputTrackerMaster: Size of output statuses for
> shuffle 155 is 527 bytes
> 15/03/24 12:39:26 INFO MapOutputTrackerMasterActor: Asked to send map
> output locations for shuffle 155 to sparkExecutor@DN1:34062
> 15/03/24 12:39:26 INFO MapOutputTrackerMasterActor: Asked to send map
> output locations for shuffle 155 to sparkExecutor@DN1:45639
>
>  Before the application gets hung on shuffle id 155, there are similar
> warnings on other shuffle ids which get resolved in 2-3 attempts, but it
> never gets resolved for shuffle 155. Is it because the *mapStatuses* in
> MapOutputTrackerMaster in Driver has a corrupt state for shuffle 155? If
> yes, then how can the spark application reach such a state and aren't there
> suitable steps to recover?
>
>  Can someone please help in debugging this issue. We haven't yet
> restarted the system as this error may not be easily reproducible but
> resolving such issues is critical to take our application to production.
> Would appreciate some quick help!
>
>  Regards,
> Ashish
>
>
>