You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dan Dietterich <da...@yahoo.com.INVALID> on 2014/09/23 22:36:15 UTC
Spark SQL 1.1.0 - large insert into parquet runs out of memory
I am trying to load data from csv format into parquet using Spark SQL.
It consistently runs out of memory.
The environment is:
* standalone cluster using HDFS and Hive metastore from HDP2.0
* spark1.1.0
* parquet jar files (v1.5) explicitly added when starting spark-sql.
* 20 workers - ec2 r3.large - set with SPARK_DAEMON_MEMORY of 10g
* 1 master - ec2 r3xlarge
The input is split across 12 files:
hdfs dfs -ls /tpcds/fcsv/catalog_returns
Found 12 items
-rw-r--r-- 3 spark hdfs 282305091 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000000_0
-rw-r--r-- 3 spark hdfs 282037998 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000001_0
-rw-r--r-- 3 spark hdfs 276284419 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000002_0
-rw-r--r-- 3 spark hdfs 269675703 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000003_0
-rw-r--r-- 3 spark hdfs 269673166 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000004_0
-rw-r--r-- 3 spark hdfs 269678197 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000005_0
-rw-r--r-- 3 spark hdfs 153478133 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000006_0
-rw-r--r-- 3 spark hdfs 147586385 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000007_0
-rw-r--r-- 3 spark hdfs 147542545 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000008_0
-rw-r--r-- 3 spark hdfs 141161085 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000009_0
-rw-r--r-- 3 spark hdfs 12110104 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000010_0
-rw-r--r-- 3 spark hdfs 6374442 2014-09-22 11:31 /tpcds/fcsv/catalog_returns/000011_0
The failure stack from spark-sql is this:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.OutOfMemoryError: Java heap space
parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:97)
parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:124)
parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:146)
parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:308)
parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:233)
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:84)
parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119)
parquet.column.impl.ColumnWriterImpl.accountForValueWritten(ColumnWriterImpl.java:108)
parquet.column.impl.ColumnWriterImpl.write(ColumnWriterImpl.java:148)
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:306)
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:133)
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeData(DataWritableWriter.java:75)
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:55)
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:77)
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:90)
org.apache.spark.sql.hive.SparkHiveHadoopWriter.write(SparkHadoopWriter.scala:98)
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:151)
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Am I missing something? Is this a case of "wrong tool for the job"?
Regards,
dd
Re: Spark SQL 1.1.0 - large insert into parquet runs out of memory
Posted by Dan Dietterich <da...@yahoo.com>.
I have only been using spark through the SQL front-end (CLI or JDBC). I don't
think I have access to saveAsParquetFile from there, do I?
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-1-0-large-insert-into-parquet-runs-out-of-memory-tp14924p14928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Spark SQL 1.1.0 - large insert into parquet runs out of memory
Posted by Michael Armbrust <mi...@databricks.com>.
I would hope that things should work for this kind of workflow.
I'm curious if you have tried using saveAsParquetFile instead of inserting
directly into a hive table (you could still register this as an external
table afterwards). Right now inserting into Hive tables is going to
through their SerDe instead of our native parquet code, so we have less
control over what is happening. If you go down the saveAsParquetFile route
you might try repartition to increase the number of partitions (and thus
decrease the amount of data buffered per partition).
On Tue, Sep 23, 2014 at 1:36 PM, Dan Dietterich <
dan_dietterich@yahoo.com.invalid> wrote:
> I am trying to load data from csv format into parquet using Spark SQL.
> It consistently runs out of memory.
>
> The environment is:
>
> - standalone cluster using HDFS and Hive metastore from HDP2.0
> - spark1.1.0
> - parquet jar files (v1.5) explicitly added when starting spark-sql.
> - 20 workers - ec2 r3.large - set with SPARK_DAEMON_MEMORY of 10g
> - 1 master - ec2 r3xlarge
>
>
> The input is split across 12 files:
> hdfs dfs -ls /tpcds/fcsv/catalog_returns
> Found 12 items
> -rw-r--r-- 3 spark hdfs 282305091 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000000_0
> -rw-r--r-- 3 spark hdfs 282037998 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000001_0
> -rw-r--r-- 3 spark hdfs 276284419 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000002_0
> -rw-r--r-- 3 spark hdfs 269675703 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000003_0
> -rw-r--r-- 3 spark hdfs 269673166 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000004_0
> -rw-r--r-- 3 spark hdfs 269678197 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000005_0
> -rw-r--r-- 3 spark hdfs 153478133 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000006_0
> -rw-r--r-- 3 spark hdfs 147586385 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000007_0
> -rw-r--r-- 3 spark hdfs 147542545 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000008_0
> -rw-r--r-- 3 spark hdfs 141161085 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000009_0
> -rw-r--r-- 3 spark hdfs 12110104 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000010_0
> -rw-r--r-- 3 spark hdfs 6374442 2014-09-22 11:31
> /tpcds/fcsv/catalog_returns/000011_0
>
> The failure stack from spark-sql is this:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage
> 0.0 (TID 1, localhost): java.lang.OutOfMemoryError: Java heap space
>
> parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:97)
>
> parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:124)
>
> parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:146)
>
> parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:308)
>
> parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:233)
>
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:84)
>
> parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119)
>
> parquet.column.impl.ColumnWriterImpl.accountForValueWritten(ColumnWriterImpl.java:108)
>
> parquet.column.impl.ColumnWriterImpl.write(ColumnWriterImpl.java:148)
>
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addDouble(MessageColumnIO.java:306)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writePrimitive(DataWritableWriter.java:133)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.writeData(DataWritableWriter.java:75)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriter.write(DataWritableWriter.java:55)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:59)
>
> org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport.write(DataWritableWriteSupport.java:31)
>
> parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:115)
>
> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:81)
>
> parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:37)
>
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:77)
>
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.write(ParquetRecordWriterWrapper.java:90)
>
> org.apache.spark.sql.hive.SparkHiveHadoopWriter.write(SparkHadoopWriter.scala:98)
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org
> $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:151)
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> org.apache.spark.scheduler.Task.run(Task.scala:54)
>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
>
> Am I missing something? Is this a case of "wrong tool for the job"?
>
> Regards,
> dd
>