You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marcelo Vanzin (JIRA)" <ji...@apache.org> on 2016/09/14 23:30:20 UTC

[jira] [Updated] (SPARK-17549) InMemoryRelation doesn't scale to large tables

     [ https://issues.apache.org/jira/browse/SPARK-17549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Marcelo Vanzin updated SPARK-17549:
-----------------------------------
    Attachment: spark-1.6.patch
                example_1.6_pre_patch.png
                example_1.6_post_patch.png
                create_parquet.scala

> InMemoryRelation doesn't scale to large tables
> ----------------------------------------------
>
>                 Key: SPARK-17549
>                 URL: https://issues.apache.org/jira/browse/SPARK-17549
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 2.0.0
>            Reporter: Marcelo Vanzin
>         Attachments: create_parquet.scala, example_1.6_post_patch.png, example_1.6_pre_patch.png, spark-1.6.patch
>
>
> An {{InMemoryRelation}} is created when you cache a table; but if the table is large, defined by either having a really large amount of columns, or a really large amount of partitions (in the file split sense, not the "table partition" sense), or both, it causes an immense amount of memory to be used in the driver.
> The reason is that it uses an accumulator to collect statistics about each partition, and instead of summarizing the data in the driver, it keeps *all* entries in memory.
> I'm attaching a script I used to create a parquet file with 20,000 columns and a single row, which I then copied 500 times so I'd have 500 partitions.
> When doing the following:
> {code}
> sqlContext.read.parquet(...).count()
> {code}
> Everything works fine, both in Spark 1.6 and 2.0. (It's super slow with the settings I used, but it works.)
> I ran spark-shell like this:
> {code}
> ./bin/spark-shell --master 'local-cluster[4,1,4096]' --driver-memory 2g --conf spark.executor.memory=2g
> {code}
> And ran:
> {code}
> sqlContext.read.parquet(...).cache().count()
> {code}
> You'll see the results in screenshot {{example_1.6_pre_patch.png}}. After 40 partitions were processed, there were 40 GenericInternalRow objects with
> 100,000 items each (5 stat info fields * 20,000 columns). So, memory usage was:
> {code}
>   40 * 100000 * (4 * 20 + 24) = 416000000 =~ 400MB
> {code}
> (Note: Integer = 20 bytes, Long = 24 bytes.)
> If I waited until the end, there would be 500 partitions, so ~ 5GB of memory to hold the stats.
> I'm also attaching a patch I made on top of 1.6 that uses just a long accumulator to capture the table size; with that patch memory usage on the driver doesn't keep growing. Also note in the patch that I'm multiplying the column size by the row count, which I think is a different bug in the existing code (those stats should be for the whole batch, not just a single row, right?). I also added {{example_1.6_post_patch.png}} to show the {{InMemoryRelation}} with the patch.
> I also applied a very similar patch on top of Spark 2.0. But there things blow up even more spectacularly when I try to run the count on the cached table. It starts with this error:
> {noformat}
> 14:19:43 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 2, vanzin-st1-3.gce.cloudera.com): java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: java.lang.IndexOutOfBoundsException: Index: 63235, Size: 1
> (lots of generated code here...)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 63235, Size: 1
> 	at java.util.ArrayList.rangeCheck(ArrayList.java:635)
> 	at java.util.ArrayList.get(ArrayList.java:411)
> 	at org.codehaus.janino.util.ClassFile.getConstantPoolInfo(ClassFile.java:556)
> 	at org.codehaus.janino.util.ClassFile.getConstantUtf8(ClassFile.java:572)
> 	at org.codehaus.janino.util.ClassFile.loadAttribute(ClassFile.java:1513)
> 	at org.codehaus.janino.util.ClassFile.loadAttributes(ClassFile.java:644)
> 	at org.codehaus.janino.util.ClassFile.loadFields(ClassFile.java:623)
> 	at org.codehaus.janino.util.ClassFile.<init>(ClassFile.java:280)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:913)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anonfun$recordCompilationStats$1.apply(CodeGenerator.scala:911)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.recordCompilationStats(CodeGenerator.scala:911)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:883)
> 	... 54 more
> {noformat}
> And basically a lot of that going on making the output unreadable, so I just killed the shell. Anyway, I believe the same fix should work there, but I can't be sure because the test doesn't work for different reasons, it seems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org