You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sadhan Sood <sa...@gmail.com> on 2014/11/12 18:31:12 UTC

Too many failed collects when trying to cache a table in SparkSQL

We are running spark on yarn with combined memory > 1TB and when trying to
cache a table partition(which is < 100G), seeing a lot of failed collect
stages in the UI and this never succeeds. Because of the failed collect, it
seems like the mapPartitions keep getting resubmitted. We have more than
enough memory so its surprising we are seeing this issue. Can someone
please help. Thanks!

The stack trace of the failed collect from UI is:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0
	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
	at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
	at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
	at org.apache.spark.scheduler.Task.run(Task.scala:56)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Re: Too many failed collects when trying to cache a table in SparkSQL

Posted by Sadhan Sood <sa...@gmail.com>.
On re running the cache statement, from the logs I see that when
collect(stage 1) fails it always leads to mapPartition(stage 0) for one
partition to be re-run. This can be seen from the collect log as well on
the container log:

rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0

The data is lzo compressed sequence file with compressed size ~ 26G. Is
there a way to understand why shuffle keeps failing for one partition. I
believe we have enough memory to store the uncompressed data in memory.

On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood <sa...@gmail.com> wrote:

> This is the log output:
>
> 2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
> (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
> SELECT * FROM xyz where date_prefix = 20141112'
>
> 2014-11-12 19:07:17,455 INFO  Configuration.deprecation
> (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
> deprecated. Instead, use mapreduce.job.maps
>
> 2014-11-12 19:07:17,756 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
> TableReader.scala:68
>
> 2014-11-12 19:07:18,292 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84
>
> 2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
> (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200
>
> 2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
> Exchange.scala:86)
>
> 2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
> with 1 output partitions (allowLocal=false)
>
> 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
> SparkPlan.scala:84)
>
> 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)
>
> 2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)
>
> 2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:07:22,916 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 161.113 s
>
> 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:10:04,112 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 52 on
> ip-10-61-175-167.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 52
>
> 2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)
>
> 2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.scala:logInfo(59))
> - Stage 0 is now unavailable on executor 52 (460/461, false)
>
> 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
> SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
> (mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
> failed in 4.571 s
>
> 2014-11-12 19:10:08,687 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
> Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
> failure
>
> 2014-11-12 19:10:08,908 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Resubmitting failed stages
>
> 2014-11-12 19:10:08,974 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:10:08,989 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:08,990 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 66.475 s
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:11:15,482 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:11:15,482 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 372 on
> ip-10-95-163-84.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:11:21,655 WARN  remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
> address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 372
>
> 2014-11-12 19:11:21,655 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)
>
>
>
>
> On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sa...@gmail.com>
> wrote:
>
>> We are running spark on yarn with combined memory > 1TB and when trying
>> to cache a table partition(which is < 100G), seeing a lot of failed collect
>> stages in the UI and this never succeeds. Because of the failed collect, it
>> seems like the mapPartitions keep getting resubmitted. We have more than
>> enough memory so its surprising we are seeing this issue. Can someone
>> please help. Thanks!
>>
>> The stack trace of the failed collect from UI is:
>>
>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
>> 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
>> 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
>> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> 	at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
>> 	at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>> 	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>> 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> 	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> 	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> 	at java.lang.Thread.run(Thread.java:745)
>>
>>
>

Re: Too many failed collects when trying to cache a table in SparkSQL

Posted by Sadhan Sood <sa...@gmail.com>.
On re running the cache statement, from the logs I see that when
collect(stage 1) fails it always leads to mapPartition(stage 0) for one
partition to be re-run. This can be seen from the collect log as well on
the container log:

rg.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0

The data is lzo compressed sequence file with compressed size ~ 26G. Is
there a way to understand why shuffle keeps failing for one partition. I
believe we have enough memory to store the uncompressed data in memory.

On Wed, Nov 12, 2014 at 2:50 PM, Sadhan Sood <sa...@gmail.com> wrote:

> This is the log output:
>
> 2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
> (Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
> SELECT * FROM xyz where date_prefix = 20141112'
>
> 2014-11-12 19:07:17,455 INFO  Configuration.deprecation
> (Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
> deprecated. Instead, use mapreduce.job.maps
>
> 2014-11-12 19:07:17,756 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
> TableReader.scala:68
>
> 2014-11-12 19:07:18,292 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84
>
> 2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
> (FileInputFormat.java:listStatus(253)) - Total input paths to process : 200
>
> 2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
> Exchange.scala:86)
>
> 2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
> with 1 output partitions (allowLocal=false)
>
> 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
> SparkPlan.scala:84)
>
> 2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)
>
> 2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)
>
> 2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:07:22,916 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 161.113 s
>
> 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:10:04,112 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 52 on
> ip-10-61-175-167.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 52
>
> 2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)
>
> 2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.scala:logInfo(59))
> - Stage 0 is now unavailable on executor 52 (460/461, false)
>
> 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
> SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
> (mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
> failed in 4.571 s
>
> 2014-11-12 19:10:08,687 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
> Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
> failure
>
> 2014-11-12 19:10:08,908 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Resubmitting failed stages
>
> 2014-11-12 19:10:08,974 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
> mapPartitions at Exchange.scala:86), which has no missing parents
>
> 2014-11-12 19:10:08,989 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:10:08,990 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
> (MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
> finished in 66.475 s
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - looking for newly runnable stages
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - running: Set()
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - waiting: Set(Stage 1)
>
> 2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - failed: Set()
>
> 2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()
>
> 2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
> SparkPlan.scala:84), which is now runnable
>
> 2014-11-12 19:11:15,482 INFO  spark.SparkContext
> (Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
> DAGScheduler.scala:838
>
> 2014-11-12 19:11:15,482 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
> (MappedRDD[16] at map at SparkPlan.scala:84)
>
> 2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
> (Logging.scala:logError(75)) - Lost executor 372 on
> ip-10-95-163-84.ec2.internal: remote Akka client disassociated
>
> 2014-11-12 19:11:21,655 WARN  remote.ReliableDeliverySupervisor
> (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
> [akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
> address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
> (Logging.scala:logError(75)) - Asked to remove non-existent executor 372
>
> 2014-11-12 19:11:21,655 INFO  scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)
>
>
>
>
> On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sa...@gmail.com>
> wrote:
>
>> We are running spark on yarn with combined memory > 1TB and when trying
>> to cache a table partition(which is < 100G), seeing a lot of failed collect
>> stages in the UI and this never succeeds. Because of the failed collect, it
>> seems like the mapPartitions keep getting resubmitted. We have more than
>> enough memory so its surprising we are seeing this issue. Can someone
>> please help. Thanks!
>>
>> The stack trace of the failed collect from UI is:
>>
>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
>> 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
>> 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
>> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> 	at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
>> 	at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
>> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
>> 	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
>> 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> 	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> 	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> 	at java.lang.Thread.run(Thread.java:745)
>>
>>
>

Re: Too many failed collects when trying to cache a table in SparkSQL

Posted by Sadhan Sood <sa...@gmail.com>.
This is the log output:

2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
(Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
SELECT * FROM xyz where date_prefix = 20141112'

2014-11-12 19:07:17,455 INFO  Configuration.deprecation
(Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
deprecated. Instead, use mapreduce.job.maps

2014-11-12 19:07:17,756 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
TableReader.scala:68

2014-11-12 19:07:18,292 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84

2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
(FileInputFormat.java:listStatus(253)) - Total input paths to process : 200

2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
Exchange.scala:86)

2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
with 1 output partitions (allowLocal=false)

2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
SparkPlan.scala:84)

2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)

2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)

2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents

2014-11-12 19:07:22,916 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 161.113 s

2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages

2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()

2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()

2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable

2014-11-12 19:10:04,112 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)

2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 52 on
ip-10-61-175-167.ec2.internal: remote Akka client disassociated

2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].

2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 52

2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)

2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.scala:logInfo(59)) -
Stage 0 is now unavailable on executor 52 (460/461, false)

2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
(mapPartitions at Exchange.scala:86)

2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
failed in 4.571 s

2014-11-12 19:10:08,687 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
failure

2014-11-12 19:10:08,908 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting failed stages

2014-11-12 19:10:08,974 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents

2014-11-12 19:10:08,989 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:10:08,990 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 66.475 s

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable

2014-11-12 19:11:15,482 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:11:15,482 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)

2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 372 on
ip-10-95-163-84.ec2.internal: remote Akka client disassociated

2014-11-12 19:11:21,655 WARN  remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].

2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 372

2014-11-12 19:11:21,655 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)




On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sa...@gmail.com> wrote:

> We are running spark on yarn with combined memory > 1TB and when trying to
> cache a table partition(which is < 100G), seeing a lot of failed collect
> stages in the UI and this never succeeds. Because of the failed collect, it
> seems like the mapPartitions keep getting resubmitted. We have more than
> enough memory so its surprising we are seeing this issue. Can someone
> please help. Thanks!
>
> The stack trace of the failed collect from UI is:
>
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
> 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
> 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> 	at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
> 	at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
> 	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
> 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
>
>

Re: Too many failed collects when trying to cache a table in SparkSQL

Posted by Sadhan Sood <sa...@gmail.com>.
This is the log output:

2014-11-12 19:07:16,561 INFO  thriftserver.SparkExecuteStatementOperation
(Logging.scala:logInfo(59)) - Running query 'CACHE TABLE xyz_cached AS
SELECT * FROM xyz where date_prefix = 20141112'

2014-11-12 19:07:17,455 INFO  Configuration.deprecation
(Configuration.java:warnOnceIfDeprecated(1009)) - mapred.map.tasks is
deprecated. Instead, use mapreduce.job.maps

2014-11-12 19:07:17,756 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 0 from broadcast at
TableReader.scala:68

2014-11-12 19:07:18,292 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Starting job: collect at SparkPlan.scala:84

2014-11-12 19:07:22,801 INFO  mapred.FileInputFormat
(FileInputFormat.java:listStatus(253)) - Total input paths to process : 200

2014-11-12 19:07:22,835 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Registering RDD 12 (mapPartitions at
Exchange.scala:86)

2014-11-12 19:07:22,837 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Got job 0 (collect at SparkPlan.scala:84)
with 1 output partitions (allowLocal=false)

2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Final stage: Stage 1(collect at
SparkPlan.scala:84)

2014-11-12 19:07:22,838 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Parents of final stage: List(Stage 0)

2014-11-12 19:07:22,842 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents: List(Stage 0)

2014-11-12 19:07:22,871 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents

2014-11-12 19:07:22,916 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 1 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:07:22,963 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 461 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

2014-11-12 19:10:04,088 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 161.113 s

2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages

2014-11-12 19:10:04,089 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()

2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

2014-11-12 19:10:04,090 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()

2014-11-12 19:10:04,094 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

2014-11-12 19:10:04,097 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable

2014-11-12 19:10:04,112 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 2 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:10:04,115 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)

2014-11-12 19:10:08,541 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 52 on
ip-10-61-175-167.ec2.internal: remote Akka client disassociated

2014-11-12 19:10:08,543 WARN  remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-61-175-167.ec2.internal:50918] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].

2014-11-12 19:10:08,548 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 52

2014-11-12 19:10:08,550 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 52 (epoch 1)

2014-11-12 19:10:08,555 INFO  scheduler.Stage (Logging.scala:logInfo(59)) -
Stage 0 is now unavailable on executor 52 (460/461, false)

2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Marking Stage 1 (collect at
SparkPlan.scala:84) as failed due to a fetch failure from Stage 0
(mapPartitions at Exchange.scala:86)

2014-11-12 19:10:08,686 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 1 (collect at SparkPlan.scala:84)
failed in 4.571 s

2014-11-12 19:10:08,687 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting Stage 0 (mapPartitions at
Exchange.scala:86) and Stage 1 (collect at SparkPlan.scala:84) due to fetch
failure

2014-11-12 19:10:08,908 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Resubmitting failed stages

2014-11-12 19:10:08,974 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 0 (MapPartitionsRDD[12] at
mapPartitions at Exchange.scala:86), which has no missing parents

2014-11-12 19:10:08,989 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 3 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:10:08,990 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 0
(MapPartitionsRDD[12] at mapPartitions at Exchange.scala:86)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Stage 0 (mapPartitions at Exchange.scala:86)
finished in 66.475 s

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - looking for newly runnable stages

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - running: Set()

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - waiting: Set(Stage 1)

2014-11-12 19:11:15,465 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - failed: Set()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Missing parents for Stage 1: List()

2014-11-12 19:11:15,466 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting Stage 1 (MappedRDD[16] at map at
SparkPlan.scala:84), which is now runnable

2014-11-12 19:11:15,482 INFO  spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 4 from broadcast at
DAGScheduler.scala:838

2014-11-12 19:11:15,482 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Submitting 1 missing tasks from Stage 1
(MappedRDD[16] at map at SparkPlan.scala:84)

2014-11-12 19:11:21,655 ERROR cluster.YarnClientClusterScheduler
(Logging.scala:logError(75)) - Lost executor 372 on
ip-10-95-163-84.ec2.internal: remote Akka client disassociated

2014-11-12 19:11:21,655 WARN  remote.ReliableDeliverySupervisor
(Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system
[akka.tcp://sparkExecutor@ip-10-95-163-84.ec2.internal:20998] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].

2014-11-12 19:11:21,655 ERROR cluster.YarnClientSchedulerBackend
(Logging.scala:logError(75)) - Asked to remove non-existent executor 372

2014-11-12 19:11:21,655 INFO  scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Executor lost: 372 (epoch 3)




On Wed, Nov 12, 2014 at 12:31 PM, Sadhan Sood <sa...@gmail.com> wrote:

> We are running spark on yarn with combined memory > 1TB and when trying to
> cache a table partition(which is < 100G), seeing a lot of failed collect
> stages in the UI and this never succeeds. Because of the failed collect, it
> seems like the mapPartitions keep getting resubmitted. We have more than
> enough memory so its surprising we are seeing this issue. Can someone
> please help. Thanks!
>
> The stack trace of the failed collect from UI is:
>
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
> 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:386)
> 	at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:383)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> 	at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:382)
> 	at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:178)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
> 	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
> 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:195)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
>
>