You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sadhan Sood <sa...@gmail.com> on 2014/11/12 18:31:12 UTC
Too many failed collects when trying to cache a table in SparkSQL
We are running spark on yarn with combined memory > 1TB and when trying to
cache a table partition(which is < 100G), seeing a lot of failed collect
stages in the UI and this never succeeds. Because of the failed collect, it
seems like the mapPartitions keep getting resubmitted. We have more than
enough memory so its surprising we are seeing this issue. Can someone
please help. Thanks!
The stack trace of the failed collect from UI is:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Re: Too many failed collects when trying to cache a table in SparkSQL
Posted by Sadhan Sood <sa...@gmail.com>.
On re running the cache statement, from the logs I see that when
collect(stage 1) fails it always leads to mapPartition(stage 0) for one
partition to be re-run. This can be seen from the collect log as well on
the container log:
rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0
The data is lzo compressed sequence file with compressed size ~ 26G. Is
there a way to understand why shuffle keeps failing for one partition. I
believe we have enough memory to store the uncompressed data in memory.
On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood <sa...@gmail.com> wrote:
> This is the log output:
>
> 2014-11-12 19:07:16,561 INFO thriftserver.SparkExecuteStatementOperation
> (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
> SELECT * FROM xyz where date_prefix = 20141112'
>
> 2014-11-12 19:07:17,455 INFO Configuration.deprecation
> (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
> deprecated. Instead, use mapreduce.job.maps
>
> 2014-11-12 19:07:17,756 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
> TableReader.scala:68
>
> 2014-11-12 19:07:18,292 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84
>
> 2014-11-12 19:07:22,801 INFO mapred.FileInputFormat
> (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200
>
> 2014-11-12 19:07:22,835 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
> Exchange.scala:86)
>
> 2014-11-12 19:07:22,837 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
> with 1 output partitions (allowLocal=false)
>
> 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
> SparkPlan.scala:84)
>
> 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)
>
> 2014-11-12 19:07:22,842 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)
>
> 2014-11-12 19:07:22,871 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:07:22,916 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:07:22,963 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:04,088 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 161.113 s
>
> 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:10:04,094 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:10:04,097 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:10:04,112 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:04,115 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 52 on
> ip-10-61-175-167.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:10:08,543 WARN remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 52
>
> 2014-11-12 19:10:08,550 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)
>
> 2014-11-12 19:10:08,555 INFO scheduler.Stage (Logging.scala:logInfo(59))
> - Stage 0 is now unavailable on executor 52 (460/461, false)
>
> 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
> SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
> (mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
> failed in 4.571 s
>
> 2014-11-12 19:10:08,687 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
> Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
> failure
>
> 2014-11-12 19:10:08,908 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Resubmitting failed stages
>
> 2014-11-12 19:10:08,974 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:10:08,989 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:08,990 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 66.475 s
>
> 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:11:15,482 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:11:15,482 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 372 on
> ip-10-95-163-84.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:11:21,655 WARN remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
> address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 372
>
> 2014-11-12 19:11:21,655 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)
>
>
>
>
> On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sa...@gmail.com>
> wrote:
>
>> We are running spark on yarn with combined memory > 1TB and when trying
>> to cache a table partition(which is < 100G), seeing a lot of failed collect
>> stages in the UI and this never succeeds. Because of the failed collect, it
>> seems like the mapPartitions keep getting resubmitted. We have more than
>> enough memory so its surprising we are seeing this issue. Can someone
>> please help. Thanks!
>>
>> The stack trace of the failed collect from UI is:
>>
>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
>> at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
>> at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
>> at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>> at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>
Re: Too many failed collects when trying to cache a table in SparkSQL
Posted by Sadhan Sood <sa...@gmail.com>.
On re running the cache statement, from the logs I see that when
collect(stage 1) fails it always leads to mapPartition(stage 0) for one
partition to be re-run. This can be seen from the collect log as well on
the container log:
rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0
The data is lzo compressed sequence file with compressed size ~ 26G. Is
there a way to understand why shuffle keeps failing for one partition. I
believe we have enough memory to store the uncompressed data in memory.
On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood <sa...@gmail.com> wrote:
> This is the log output:
>
> 2014-11-12 19:07:16,561 INFO thriftserver.SparkExecuteStatementOperation
> (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
> SELECT * FROM xyz where date_prefix = 20141112'
>
> 2014-11-12 19:07:17,455 INFO Configuration.deprecation
> (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
> deprecated. Instead, use mapreduce.job.maps
>
> 2014-11-12 19:07:17,756 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
> TableReader.scala:68
>
> 2014-11-12 19:07:18,292 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84
>
> 2014-11-12 19:07:22,801 INFO mapred.FileInputFormat
> (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200
>
> 2014-11-12 19:07:22,835 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
> Exchange.scala:86)
>
> 2014-11-12 19:07:22,837 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
> with 1 output partitions (allowLocal=false)
>
> 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
> SparkPlan.scala:84)
>
> 2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)
>
> 2014-11-12 19:07:22,842 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)
>
> 2014-11-12 19:07:22,871 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:07:22,916 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:07:22,963 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:04,088 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 161.113 s
>
> 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:10:04,094 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:10:04,097 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:10:04,112 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:04,115 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 52 on
> ip-10-61-175-167.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:10:08,543 WARN remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 52
>
> 2014-11-12 19:10:08,550 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)
>
> 2014-11-12 19:10:08,555 INFO scheduler.Stage (Logging.scala:logInfo(59))
> - Stage 0 is now unavailable on executor 52 (460/461, false)
>
> 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
> SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
> (mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
> failed in 4.571 s
>
> 2014-11-12 19:10:08,687 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
> Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
> failure
>
> 2014-11-12 19:10:08,908 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Resubmitting failed stages
>
> 2014-11-12 19:10:08,974 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:10:08,989 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:08,990 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 66.475 s
>
> 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:11:15,482 INFO spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:11:15,482 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 372 on
> ip-10-95-163-84.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:11:21,655 WARN remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
> address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 372
>
> 2014-11-12 19:11:21,655 INFO scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)
>
>
>
>
> On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sa...@gmail.com>
> wrote:
>
>> We are running spark on yarn with combined memory > 1TB and when trying
>> to cache a table partition(which is < 100G), seeing a lot of failed collect
>> stages in the UI and this never succeeds. Because of the failed collect, it
>> seems like the mapPartitions keep getting resubmitted. We have more than
>> enough memory so its surprising we are seeing this issue. Can someone
>> please help. Thanks!
>>
>> The stack trace of the failed collect from UI is:
>>
>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
>> at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
>> at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
>> at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
>> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>> at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>
Re: Too many failed collects when trying to cache a table in SparkSQL
Posted by Sadhan Sood <sa...@gmail.com>.
This is the log output:
2014-11-12 19:07:16,561 INFO thriftserver.SparkExecuteStatementOperation
(Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
SELECT * FROM xyz where date_prefix = 20141112'
2014-11-12 19:07:17,455 INFO Configuration.deprecation
(Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
deprecated. Instead, use mapreduce.job.maps
2014-11-12 19:07:17,756 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
TableReader.scala:68
2014-11-12 19:07:18,292 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84
2014-11-12 19:07:22,801 INFO mapred.FileInputFormat
(FileInputFormat.java:listStatus(253)) - Total input paths to process : 200
2014-11-12 19:07:22,835 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
Exchange.scala:86)
2014-11-12 19:07:22,837 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
with 1 output partitions (allowLocal=false)
2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
SparkPlan.scala:84)
2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)
2014-11-12 19:07:22,842 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)
2014-11-12 19:07:22,871 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents
2014-11-12 19:07:22,916 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
DAGScheduler.scala:838
2014-11-12 19:07:22,963 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
2014-11-12 19:10:04,088 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 161.113 s
2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages
2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()
2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()
2014-11-12 19:10:04,094 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
2014-11-12 19:10:04,097 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable
2014-11-12 19:10:04,112 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
DAGScheduler.scala:838
2014-11-12 19:10:04,115 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)
2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 52 on
ip-10-61-175-167.ec2.internal: remote Akka client disassociated
2014-11-12 19:10:08,543 WARN remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].
2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 52
2014-11-12 19:10:08,550 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)
2014-11-12 19:10:08,555 INFO scheduler.Stage (Logging.scala:logInfo(59)) -
Stage 0 is now unavailable on executor 52 (460/461, false)
2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
(mapPartitions at Exchange.scala:86)
2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
failed in 4.571 s
2014-11-12 19:10:08,687 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
failure
2014-11-12 19:10:08,908 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting failed stages
2014-11-12 19:10:08,974 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents
2014-11-12 19:10:08,989 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
DAGScheduler.scala:838
2014-11-12 19:10:08,990 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 66.475 s
2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages
2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()
2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()
2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable
2014-11-12 19:11:15,482 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
DAGScheduler.scala:838
2014-11-12 19:11:15,482 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)
2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 372 on
ip-10-95-163-84.ec2.internal: remote Akka client disassociated
2014-11-12 19:11:21,655 WARN remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].
2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 372
2014-11-12 19:11:21,655 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)
On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sa...@gmail.com> wrote:
> We are running spark on yarn with combined memory > 1TB and when trying to
> cache a table partition(which is < 100G), seeing a lot of failed collect
> stages in the UI and this never succeeds. Because of the failed collect, it
> seems like the mapPartitions keep getting resubmitted. We have more than
> enough memory so its surprising we are seeing this issue. Can someone
> please help. Thanks!
>
> The stack trace of the failed collect from UI is:
>
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
> at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
> at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
> at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
> at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
Re: Too many failed collects when trying to cache a table in SparkSQL
Posted by Sadhan Sood <sa...@gmail.com>.
This is the log output:
2014-11-12 19:07:16,561 INFO thriftserver.SparkExecuteStatementOperation
(Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
SELECT * FROM xyz where date_prefix = 20141112'
2014-11-12 19:07:17,455 INFO Configuration.deprecation
(Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
deprecated. Instead, use mapreduce.job.maps
2014-11-12 19:07:17,756 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
TableReader.scala:68
2014-11-12 19:07:18,292 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84
2014-11-12 19:07:22,801 INFO mapred.FileInputFormat
(FileInputFormat.java:listStatus(253)) - Total input paths to process : 200
2014-11-12 19:07:22,835 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
Exchange.scala:86)
2014-11-12 19:07:22,837 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
with 1 output partitions (allowLocal=false)
2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
SparkPlan.scala:84)
2014-11-12 19:07:22,838 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)
2014-11-12 19:07:22,842 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)
2014-11-12 19:07:22,871 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents
2014-11-12 19:07:22,916 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
DAGScheduler.scala:838
2014-11-12 19:07:22,963 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
2014-11-12 19:10:04,088 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 161.113 s
2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages
2014-11-12 19:10:04,089 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()
2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
2014-11-12 19:10:04,090 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()
2014-11-12 19:10:04,094 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
2014-11-12 19:10:04,097 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable
2014-11-12 19:10:04,112 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
DAGScheduler.scala:838
2014-11-12 19:10:04,115 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)
2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 52 on
ip-10-61-175-167.ec2.internal: remote Akka client disassociated
2014-11-12 19:10:08,543 WARN remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].
2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 52
2014-11-12 19:10:08,550 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)
2014-11-12 19:10:08,555 INFO scheduler.Stage (Logging.scala:logInfo(59)) -
Stage 0 is now unavailable on executor 52 (460/461, false)
2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
(mapPartitions at Exchange.scala:86)
2014-11-12 19:10:08,686 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
failed in 4.571 s
2014-11-12 19:10:08,687 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
failure
2014-11-12 19:10:08,908 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting failed stages
2014-11-12 19:10:08,974 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents
2014-11-12 19:10:08,989 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
DAGScheduler.scala:838
2014-11-12 19:10:08,990 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 66.475 s
2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages
2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()
2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
2014-11-12 19:11:15,465 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()
2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
2014-11-12 19:11:15,466 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable
2014-11-12 19:11:15,482 INFO spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
DAGScheduler.scala:838
2014-11-12 19:11:15,482 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)
2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 372 on
ip-10-95-163-84.ec2.internal: remote Akka client disassociated
2014-11-12 19:11:21,655 WARN remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].
2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 372
2014-11-12 19:11:21,655 INFO scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)
On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sa...@gmail.com> wrote:
> We are running spark on yarn with combined memory > 1TB and when trying to
> cache a table partition(which is < 100G), seeing a lot of failed collect
> stages in the UI and this never succeeds. Because of the failed collect, it
> seems like the mapPartitions keep getting resubmitted. We have more than
> enough memory so its surprising we are seeing this issue. Can someone
> please help. Thanks!
>
> The stack trace of the failed collect from UI is:
>
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
> at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
> at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
> at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
> at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
> at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>