You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by vladio <vl...@palantir.com> on 2015/06/23 20:26:11 UTC

[DataFrame] partitionBy issues

Hi,

I'm running into a strange memory scaling issue when using the partitionBy
feature of DataFrameWriter.

I've generated a table (a CSV file) with 3 columns (A, B and C) and 32*32
different entries, with size on disk of about 20kb. There are 32 distinct
values for column A and 32 distinct values for column B and all these are
combined together (column C will contain a random number for each row - it
doesn't matter) producing a 32*32 elements data set. I've imported this into
Spark and I ran a partitionBy("A", "B") in order to test its performance.
This should create a nested directory structure with 32 folders, each of
them containing another 32 folders. It uses about 10Gb of RAM and it's
running slow. If I increase the number of entries in the table from 32*32 to
128*128, I get Java Heap Space Out Of Memory no matter what value I use for
Heap Space variabile. Is this a known bug?

Scala code:
var df = sqlContext.read.format("com.databricks.spark.csv").option("header",
"true").load("table.csv")
df.write.partitionBy("A", "B").mode("overwrite").parquet("table.parquet”)

How I ran the Spark shell:
bin/spark-shell --driver-memory 16g --master local[8] --packages
com.databricks:spark-csv_2.10:1.0.3

Attached you'll find table.csv which I used.  table.csv
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/n12838/table.csv>  

Thank you,
Vlad



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrame-partitionBy-issues-tp12838.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: [DataFrame] partitionBy issues

Posted by rake <ra...@randykerber.com>.
I ran into a similar problem, reading a csv file into a DataFrame and saving
to Parquet with 'partitionBy', and getting OutOfMemory error even though
it's not a large data file.

I discovered that by default Spark appears to be allocating a block of 128MB
in memory for each output Parquet partition, controlled by a
"parquet.block.size" parameter.  So if there are lots of Parquet partitions,
it's easy to quickly run out of memory even if the actual amount of data is
small.

I tried repeatedly to override the 128MB default by setting properties with
names like "block.size", "parquet.block.size",
"spark.sql.parquet.block.size", etc., to SparkContext, SQLContext, and as an
"option()" to DataFrameWriter when calling DataFrameWriter.parquet.  None of
those worked.  No matter what, my setting was ignored and the default value
of 128MB was used (according the log from using spark-shell).

Eventually I found a pointer to sparkContext.hadoopConfiguration. For
example, to set the block size to 16MB try:

    sparkContext.hadoopConfiguration.setInt( "dfs.blocksize", 1024 * 1024 *
16 )
    sparkContext.hadoopConfiguration.setInt( "parquet.block.size", 1024 *
1024 * 16 )

That worked.  Though it seems like some of the other options I tried (e.g.,
"spark.sql.parquet.block.size") ought to be supported.

-- Randy

Randy Kerber
Data Science Consultant


vladio wrote
> Hi,
> 
> I'm running into a strange memory scaling issue when using the partitionBy
> feature of DataFrameWriter.
> 
> I've generated a table (a CSV file) with 3 columns (A, B and C) and 32*32
> different entries, with size on disk of about 20kb. There are 32 distinct
> values for column A and 32 distinct values for column B and all these are
> combined together (column C will contain a random number for each row - it
> doesn't matter) producing a 32*32 elements data set. I've imported this
> into Spark and I ran a partitionBy("A", "B") in order to test its
> performance. This should create a nested directory structure with 32
> folders, each of them containing another 32 folders. It uses about 10Gb of
> RAM and it's running slow. If I increase the number of entries in the
> table from 32*32 to 128*128, I get Java Heap Space Out Of Memory no matter
> what value I use for Heap Space variabile. Is this a known bug?
> 
> Scala code:
> var df =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").load("table.csv")
> df.write.partitionBy("A", "B").mode("overwrite").parquet("table.parquet”)
> 
> How I ran the Spark shell:
> bin/spark-shell --driver-memory 16g --master local[8] --packages
> com.databricks:spark-csv_2.10:1.0.3
> 
> Attached you'll find table.csv which I used. 
> table.csv
> <http://apache-spark-developers-list.1001551.n3.nabble.com/file/n12838/table.csv>  
> 
> Thank you,
> Vlad





--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrame-partitionBy-issues-tp12838p12970.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: [DataFrame] partitionBy issues

Posted by vladio <vl...@palantir.com>.
https://issues.apache.org/jira/browse/SPARK-8597

A JIRA ticket discussing the same problem (with more insights than here)!



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/DataFrame-partitionBy-issues-tp12838p12974.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org