You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Imran Rashid (JIRA)" <ji...@apache.org> on 2016/07/01 01:54:11 UTC

[jira] [Updated] (SPARK-14138) Generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames

     [ https://issues.apache.org/jira/browse/SPARK-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Imran Rashid updated SPARK-14138:
---------------------------------
    Fix Version/s: 1.6.2

> Generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-14138
>                 URL: https://issues.apache.org/jira/browse/SPARK-14138
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1
>            Reporter: Sven Krasser
>            Assignee: Kazuaki Ishizaki
>             Fix For: 1.6.2, 2.0.0
>
>
> The generated {{SpecificColumnarIterator}} code for wide DataFrames can exceed the JVM 64k limit under certain circumstances. This snippet reproduces the error in spark-shell (with 5G driver memory) by creating a new DataFrame with >2000 aggregation-based columns:
> {code}
> val df = sc.parallelize(1 to 10).toDF()
> val aggr = {1 to 2260}.map(colnum => avg(df.col("_1")).as(s"col_$colnum"))
> val res = df.groupBy("_1").agg(count("_1"), aggr: _*).cache()
> res.show() // this will break
> {code}
> The following error is produced (pruned for brevity):
> {noformat}
> /* 001 */
> /* 002 */ import java.nio.ByteBuffer;
> /* 003 */ import java.nio.ByteOrder;
> /* 004 */ import scala.collection.Iterator;
> /* 005 */ import org.apache.spark.sql.types.DataType;
> /* 006 */ import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
> /* 007 */ import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
> /* 008 */ import org.apache.spark.sql.execution.columnar.MutableUnsafeRow;
> /* 009 */
> /* 010 */ public SpecificColumnarIterator generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
> /* 011 */   return new SpecificColumnarIterator();
> /* 012 */ }
> /* 013 */
> ...
> /* 9113 */     accessor2261.extractTo(mutableRow, 2261);
> /* 9114 */     unsafeRow.pointTo(bufferHolder.buffer, 2262, bufferHolder.totalSize());
> /* 9115 */     return unsafeRow;
> /* 9116 */   }
> /* 9117 */ }
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:555)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:575)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:572)
> 	at org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
> 	at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
> 	... 28 more
> Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "()Z" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator" grows beyond 64 KB
> 	at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
> 	at org.codehaus.janino.CodeContext.write(CodeContext.java:836)
> 	at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251)
> 	at org.codehaus.janino.UnitCompiler.invoke(UnitCompiler.java:10050)
> 	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4008)
> 	at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:185)
> 	at org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3263)
> 	at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974)
> 	at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290)
> 	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368)
> 	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:3927)
> 	at org.codehaus.janino.UnitCompiler.access$6900(UnitCompiler.java:185)
> 	at org.codehaus.janino.UnitCompiler$10.visitMethodInvocation(UnitCompiler.java:3263)
> 	at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:3974)
> 	at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290)
> 	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368)
> 	at org.codehaus.janino.UnitCompiler.invokeConstructor(UnitCompiler.java:6681)
> 	at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4126)
> 	at org.codehaus.janino.UnitCompiler.access$7600(UnitCompiler.java:185)
> 	at org.codehaus.janino.UnitCompiler$10.visitNewClassInstance(UnitCompiler.java:3275)
> 	at org.codehaus.janino.Java$NewClassInstance.accept(Java.java:4085)
> 	at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:3290)
> 	at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:4368)
> 	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2669)
> 	at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185)
> 	at org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619)
> 	at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
> 	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654)
> 	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643)
> 	at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185)
> 	at org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936)
> 	at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097)
> 	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
> 	at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
> 	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293)
> 	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:822)
> 	at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:794)
> 	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:507)
> 	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
> 	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
> 	at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
> 	at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
> 	at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
> 	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
> 	at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
> 	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
> 	at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
> 	at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
> 	at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
> 	at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
> 	at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
> 	at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
> 	at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
> 	at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
> 	at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
> 	at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
> 	at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:550)
> 	... 32 more
> {noformat}
> Note that the issue does not occur (and the {{.show()}} call prints the right results) when the number of aggregation columns is slightly reduced, 2250 instead of 2260 in this case:
> {code}
> val df = sc.parallelize(1 to 10).toDF()
> val aggr = {1 to 2250}.map(colnum => avg(df.col("_1")).as(s"col_$colnum")) // only 2250
> val res = df.groupBy("_1").agg(count("_1"), aggr: _*).cache()
> res.show() // this will work
> {code}
> Also, if the final DataFrame is not cached, then it will also work for 2260 aggregations:
> {code}
> val df = sc.parallelize(1 to 10).toDF()
> val aggr = {1 to 2260}.map(colnum => avg(df.col("_1")).as(s"col_$colnum"))
> val res = df.groupBy("_1").agg(count("_1"), aggr: _*) // no .cache() call
> res.show() // this will work
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org