You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by jatinpreet <ja...@gmail.com> on 2017/02/22 06:11:04 UTC

Spark SQL : Join operation failure

Hi,

I am having a hard time running outer join operation on two parquet
datasets. The dataset size is large ~500GB with a lot of culumns in tune of
1000.

As per YARN administer imposed limits in the queue, I can have a total of 20
vcores and 8GB memory per executor.

I specified meory overhead and increased number of shuffle partitions to no
avail. This is how I submitted the job with pyspark,

spark-submit --master yarn-cluster --executor-memory 5500m --num-executors
19 --executor-cores 1 --conf spark.yarn.executor.memoryOverhead=2000 --conf
spark.sql.shuffle.partitions=2048 --driver-memory 7g --queue
<queue_name>./<python_script>

The relevant code is, 

cm_go.registerTempTable("x")
ko.registerTempTable("y")
joined_df = sqlCtx.sql("select * from x FULL OUTER JOIN y ON field1=field2")
joined_df.write.save("/user/data/output")


I am getting errors like these:

ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
Reason: Container marked as failed:
container_e36_1487531133522_0058_01_000006 on host: dn2.bigdatalab.org. Exit
status: 52. Diagnostics: Exception from container-launch.
Container id: container_e36_1487531133522_0058_01_000006
Exit code: 52
Stack trace: ExitCodeException exitCode=52: 
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:933)
	at org.apache.hadoop.util.Shell.run(Shell.java:844)
	at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1123)
	at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:225)
	at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
	at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 52

----------------------------------------------------------------------------------------------------------

FetchFailed(null, shuffleId=0, mapId=-1, reduceId=508, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0
	at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:695)
	at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:691)
	at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:691)
	at
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:145)
	at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
	at
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

)



I would appreciate if someone can help me out on this.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-operation-failure-tp28414.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark SQL : Join operation failure

Posted by Yong Zhang <ja...@hotmail.com>.
Your error message is not clear about what really happens.


Is your container killed by Yarn, or it indeed runs OOM?

When I run the spark job with big data, here is normally what I will do:

1) Enable GC output. You need to monitor the GC output in the executor, to understand the GC pressure. If you see the feq full GC, you know your job is in danger.
2) Monitor the statistics of tasks in feq full GC executor. How many records are processing so far, what is the spill read/write bytes. Is the OOM only happening in one task with much higher statistics than the rest? This normally means data skew. If lots of task all have GC pressure, then your setting is just not enough for job.
3) In your case, you first want to know what kind of join Spark is using for your outer join. Does it make sense for your data? Wrong join way could lead to wrong way to do the job.

Yong

________________________________
From: jatinpreet <ja...@gmail.com>
Sent: Wednesday, February 22, 2017 1:11 AM
To: user@spark.apache.org
Subject: Spark SQL : Join operation failure

Hi,

I am having a hard time running outer join operation on two parquet
datasets. The dataset size is large ~500GB with a lot of culumns in tune of
1000.

As per YARN administer imposed limits in the queue, I can have a total of 20
vcores and 8GB memory per executor.

I specified meory overhead and increased number of shuffle partitions to no
avail. This is how I submitted the job with pyspark,

spark-submit --master yarn-cluster --executor-memory 5500m --num-executors
19 --executor-cores 1 --conf spark.yarn.executor.memoryOverhead=2000 --conf
spark.sql.shuffle.partitions=2048 --driver-memory 7g --queue
<queue_name>./<python_script>

The relevant code is,

cm_go.registerTempTable("x")
ko.registerTempTable("y")
joined_df = sqlCtx.sql("select * from x FULL OUTER JOIN y ON field1=field2")
joined_df.write.save("/user/data/output")


I am getting errors like these:

ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
Reason: Container marked as failed:
container_e36_1487531133522_0058_01_000006 on host: dn2.bigdatalab.org. Exit
status: 52. Diagnostics: Exception from container-launch.
Container id: container_e36_1487531133522_0058_01_000006
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:933)
        at org.apache.hadoop.util.Shell.run(Shell.java:844)
        at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1123)
        at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:225)
        at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
        at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 52

----------------------------------------------------------------------------------------------------------

FetchFailed(null, shuffleId=0, mapId=-1, reduceId=508, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0
        at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:695)
        at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:691)
        at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:691)
        at
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:145)
        at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
        at
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

)



I would appreciate if someone can help me out on this.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-operation-failure-tp28414.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark SQL : Join operation failure

Posted by neil90 <ne...@icloud.com>.
It might be a memory issue. Try adding .persist(MEMORY_AND_DISK_ONLY) so that
if the RDD can't fit into memory it will persist parts of the RDD into disk.


cm_go.registerTempTable("x") 
ko.registerTempTable("y") 
joined_df = sqlCtx.sql("select * from x FULL OUTER JOIN y ON field1=field2") 
joined_df.persist(StorageLevel.MEMORY_AND_DISK_ONLY)
joined_df.write.save("/user/data/output") 




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Join-operation-failure-tp28414p28422.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org