You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dan Dietterich <da...@yahoo.com.INVALID> on 2014/09/23 22:36:15 UTC

Spark SQL 1.1.0 - large insert into parquet runs out of memory

I am trying to load data from csv format into parquet using Spark SQL.
It consistently runs out of memory.

The environment is:
	* standalone cluster using HDFS and Hive metastore from HDP2.0

	* spark1.1.0

	* parquet jar files (v1.5) explicitly added when starting spark-sql.

	* 20 workers - ec2 r3.large - set with SPARK_DAEMON_MEMORY of 10g

	* 1 master - ec2 r3xlarge


The input is split across 12 files:
hdfs dfs -ls /tpcds/fcsv/catalog_returns
Found 12 items
-rw-r--r--   3 spark hdfs  282305091 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000000_0
-rw-r--r--   3 spark hdfs  282037998 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000001_0
-rw-r--r--   3 spark hdfs  276284419 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000002_0
-rw-r--r--   3 spark hdfs  269675703 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000003_0
-rw-r--r--   3 spark hdfs  269673166 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000004_0
-rw-r--r--   3 spark hdfs  269678197 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000005_0
-rw-r--r--   3 spark hdfs  153478133 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000006_0
-rw-r--r--   3 spark hdfs  147586385 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000007_0
-rw-r--r--   3 spark hdfs  147542545 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000008_0
-rw-r--r--   3 spark hdfs  141161085 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000009_0
-rw-r--r--   3 spark hdfs   12110104 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000010_0
-rw-r--r--   3 spark hdfs    6374442 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000011_0

The failure stack from spark-sql is this:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.OutOfMemoryError: Java heap space
        parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:97)
        parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:124)
        parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:146)
        parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:308)
        parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:233)
        parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:84)
        parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119)
        parquet.column.impl.ColumnWriterImpl.accountForValueWritten(ColumnWriterImpl.java:108)
        parquet.column.impl.ColumnWriterImpl.write(ColumnWriterImpl.java:148)
        parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:306)
        org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:133)
        org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeData(DataWritableWriter.java:75)
        org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:55)
        org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
        org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
        parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
        parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
        parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
        org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:77)
        org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:90)
        org.apache.spark.sql.hive.SparkHiveHadoopWriter.write(SparkHadoopWriter.scala:98)
        org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:151)
        org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
        org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)

Am I missing something? Is this a case of "wrong tool for the job"? 

Regards,
dd

Re: Spark SQL 1.1.0 - large insert into parquet runs out of memory

Posted by Dan Dietterich <da...@yahoo.com>.
I have only been using spark through the SQL front-end (CLI or JDBC). I don't
think I have access to saveAsParquetFile from there, do I?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-1-0-large-insert-into-parquet-runs-out-of-memory-tp14924p14928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark SQL 1.1.0 - large insert into parquet runs out of memory

Posted by Michael Armbrust <mi...@databricks.com>.
I would hope that things should work for this kind of workflow.

I'm curious if you have tried using saveAsParquetFile instead of inserting
directly into a hive table (you could still register this as an external
table afterwards).  Right now inserting into Hive tables is going to
through their SerDe instead of our native parquet code, so we have less
control over what is happening.  If you go down the saveAsParquetFile route
you might try repartition to increase the number of partitions (and thus
decrease the amount of data buffered per partition).

On Tue, Sep 23, 2014 at 1:36 PM, Dan Dietterich <
dan_dietterich@yahoo.com.invalid> wrote:

> I am trying to load data from csv format into parquet using Spark SQL.
> It consistently runs out of memory.
>
> The environment is:
>
>    - standalone cluster using HDFS and Hive metastore from HDP2.0
>    - spark1.1.0
>    - parquet jar files (v1.5) explicitly added when starting spark-sql.
>    - 20 workers - ec2 r3.large - set with SPARK_DAEMON_MEMORY of 10g
>    - 1 master - ec2 r3xlarge
>
>
> The input is split across 12 files:
> hdfs dfs -ls /tpcds/fcsv/catalog_returns
> Found 12 items
> -rw-r--r--   3 spark hdfs  282305091 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000000_0
> -rw-r--r--   3 spark hdfs  282037998 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000001_0
> -rw-r--r--   3 spark hdfs  276284419 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000002_0
> -rw-r--r--   3 spark hdfs  269675703 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000003_0
> -rw-r--r--   3 spark hdfs  269673166 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000004_0
> -rw-r--r--   3 spark hdfs  269678197 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000005_0
> -rw-r--r--   3 spark hdfs  153478133 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000006_0
> -rw-r--r--   3 spark hdfs  147586385 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000007_0
> -rw-r--r--   3 spark hdfs  147542545 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000008_0
> -rw-r--r--   3 spark hdfs  141161085 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000009_0
> -rw-r--r--   3 spark hdfs   12110104 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000010_0
> -rw-r--r--   3 spark hdfs    6374442 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000011_0
>
> The failure stack from spark-sql is this:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage
> 0.0 (TID 1, localhost): java.lang.OutOfMemoryError: Java heap space
>
> parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:97)
>
> parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:124)
>
> parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:146)
>
> parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:308)
>
> parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:233)
>
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:84)
>
> parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119)
>
> parquet.column.impl.ColumnWriterImpl.accountForValueWritten(ColumnWriterImpl.java:108)
>
> parquet.column.impl.ColumnWriterImpl.write(ColumnWriterImpl.java:148)
>
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:306)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:133)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeData(DataWritableWriter.java:75)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:55)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
>
> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
>
> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
>
> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
>
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:77)
>
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:90)
>
> org.apache.spark.sql.hive.SparkHiveHadoopWriter.write(SparkHadoopWriter.scala:98)
>         org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org
> $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:151)
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
>
> Am I missing something? Is this a case of "wrong tool for the job"?
>
> Regards,
> dd
>