You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2019/10/08 05:44:14 UTC

[jira] [Resolved] (SPARK-24922) Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

     [ https://issues.apache.org/jira/browse/SPARK-24922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-24922.
----------------------------------
    Resolution: Incomplete

> Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.
> ------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24922
>                 URL: https://issues.apache.org/jira/browse/SPARK-24922
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 2.3.0
>         Environment: Java 8, Scala 2.11.8, Spark 2.3.0, sbt 0.13.16
>  
>            Reporter: Dinesh Dharme
>            Priority: Major
>              Labels: bulk-closed
>
> I am trying to do few (union + reduceByKey) operations on a hiearchical dataset in a iterative fashion in rdd. The first few loops run fine but on the subsequent loops, the operations ends up using the whole scratch space provided to it. 
> I have set the scratch directory, i.e. SPARK_LOCAL_DIRS , to be one having *100 GB* space.
> The heirarchical dataset, whose size is (< 400kB), remains constant throughout the iterations.
>  I have tried the worker cleanup flag but it has no effect i.e. "spark.worker.cleanup.enabled=true"
>  
> Error : 
>  
> {noformat}
> Caused by: java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:326)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
> at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
> at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
> at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
> at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
>  
> *What I am trying to do (High Level)*:
> I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21, Child22 ) which are related in a hierarchical fashion as shown below. 
> Parent-> Child1 -> Child2  -> Child21 
> Parent-> Child1 -> Child2  -> Child22 
> Each element in the tree has 14 columns (elementid, parentelement_id, cat1, cat2, num1, num2,....., num10)
> I am trying to aggregate the values of one column of Child21 into Child1 (i.e. 2 levels up). I am doing the same for another column value of Child22 into Child1. Then I am merging these aggregated values at the same Child1 level.
> This is present in the code at location : 
> spark.rddexample.dummyrdd.tree.child1.events.Function1
>  
>  
> *Code which replicates the issue*: 
> 1] [https://github.com/dineshdharme/SparkRddShuffleIssue]
>  
> *Steps to reproduce the issue :* 
> 1] Clone the above repository.
> 2] Put the csvs in the "issue-data" folder in the above repository at a hadoop location "hdfs:///tree/dummy/data/"
> 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has large space. (> *100 GB*)
> 4] Run "sbt assembly"
> 5] Run the following command at the project location : 
> /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
>  --class spark.rddexample.dummyrdd.FunctionExecutor \
>  --master local[2] \
>  --deploy-mode client \
>  --executor-memory 2G \
>  --driver-memory 2G \
>  target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
>  20 \
>  hdfs:///tree/dummy/data/ \
>  hdfs:///tree/dummy/results/   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org