You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/04/01 13:09:52 UTC

[jira] [Resolved] (SPARK-1001) Memory leak when reading sequence file and then sorting

     [ https://issues.apache.org/jira/browse/SPARK-1001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-1001.
------------------------------
    Resolution: Cannot Reproduce

> Memory leak when reading sequence file and then sorting
> -------------------------------------------------------
>
>                 Key: SPARK-1001
>                 URL: https://issues.apache.org/jira/browse/SPARK-1001
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle, Spark Core
>    Affects Versions: 0.8.0
>            Reporter: Matthew Cheah
>              Labels: Hadoop, Memory
>
> Spark appears to build up a backlog of unreachable byte arrays when an RDD is constructed from a sequence file, and then that RDD is sorted.
> I have a class that wraps a Java ArrayList, that can be serialized and written to a Hadoop SequenceFile (I.e. Implements the Writable interface). Let's call it WritableDataRow. It can take a Java List as its argument to wrap around, and also has a copy constructor.
> Setup: 10 slaves, launched via EC2, 65.9GB RAM each, dataset is 100GB of text, 120GB when in sequence file format (not using compression to compact the bytes). CDH4.2.0-backed hadoop cluster.
> First, building the RDD from a CSV and then sorting on index 1 works fine:
> {code}
> scala> import scala.collection.JavaConversions._ // Other imports here as well
> import scala.collection.JavaConversions._
> scala> val rddAsTextFile = sc.textFile("s3n://some-bucket/events-*.csv")
> rddAsTextFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14
> scala> val rddAsWritableDataRows = rddAsTextFile.map(x => new WritableDataRow(x.split("\\|").toList))
> rddAsWritableDataRows: org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow] = MappedRDD[2] at map at <console>:19
> scala> val rddAsKeyedWritableDataRows = rddAsWritableDataRows.map(x => (x.getContents().get(1).toString(), x));
> rddAsKeyedWritableDataRows: org.apache.spark.rdd.RDD[(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = MappedRDD[4] at map at <console>:22
> scala> val orderedFunct = new org.apache.spark.rdd.OrderedRDDFunctions[String, WritableDataRow, (String, WritableDataRow)](rddAsKeyedWritableDataRows)
> orderedFunct: org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = org.apache.spark.rdd.OrderedRDDFunctions@587acb54
> scala> orderedFunct.sortByKey(true).count(); // Actually triggers the computation, as stated in a different e-mail thread
> res0: org.apache.spark.rdd.RDD[(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = MapPartitionsRDD[8] at sortByKey at <console>:27
> {code}
> The above works without too many surprises. I then save it as a Sequence File (using JavaPairRDD as a way to more easily call saveAsHadoopFile(), and this is how it's done in our Java-based application):
> {code}
> scala> val pairRDD = new JavaPairRDD(rddAsWritableDataRows.map(x => (NullWritable.get(), x)));
> pairRDD: org.apache.spark.api.java.JavaPairRDD[org.apache.hadoop.io.NullWritable,com.palantir.finance.datatable.server.spark.WritableDataRow] = org.apache.spark.api.java.JavaPairRDD@8d2e9d9
> scala> pairRDD.saveAsHadoopFile("hdfs://<hdfs-master-url>:9010/blah", classOf[NullWritable], classOf[WritableDataRow], classOf[org.apache.hadoop.mapred.SequenceFileOutputFormat[NullWritable, WritableDataRow]]);
> …
> 2013-12-11 20:09:14,444 [main] INFO  org.apache.spark.SparkContext - Job finished: saveAsHadoopFile at <console>:26, took 1052.116712748 s
> {code}
> And now I want to get the RDD from the sequence file and sort THAT, and this is when I monitor Ganglia and "ps aux" and notice the memory usage climbing ridiculously:
> {code}
> scala> val rddAsSequenceFile = sc.sequenceFile("hdfs://<hdfs-master-url>:9010/blah", classOf[NullWritable], classOf[WritableDataRow]).map(x => new WritableDataRow(x._2)); // Invokes copy constructor to get around re-use of writable objects
> rddAsSequenceFile: org.apache.spark.rdd.RDD[com.palantir.finance.datatable.server.spark.WritableDataRow] = MappedRDD[19] at map at <console>:19
> scala> val orderedFunct = new org.apache.spark.rdd.OrderedRDDFunctions[String, WritableDataRow, (String, WritableDataRow)](rddAsSequenceFile.map(x => (x.getContents().get(1).toString(), x)))
> orderedFunct: org.apache.spark.rdd.OrderedRDDFunctions[String,com.palantir.finance.datatable.server.spark.WritableDataRow,(String, com.palantir.finance.datatable.server.spark.WritableDataRow)] = org.apache.spark.rdd.OrderedRDDFunctions@6262a9a6
> scala>orderedFunct.sortByKey().count();
> {code}
> (On the necessity to copy writables from hadoop RDDs, see: https://mail-archives.apache.org/mod_mbox/spark-user/201308.mbox/%3CCAF_KkPzrq4OTyQVwcOC6pLAz9X9_SFo33u4ySatki5PTqoYEDA@mail.gmail.com%3E )
> I got a memory dump from one worker node but can't share it. I've attached a screenshot from YourKit. At the point where around 5GB of RAM is being used on the worker, 3GB of unreachable byte arrays have accumulated. Furthermore, they're all exactly the same size, and seem to be the same size as most of the byte arrays that are strong reachable. The strong reachable byte arrays are referenced from output streams in the block output writers.
> Let me know if you require any more information. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org