You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Davies Liu (JIRA)" <ji...@apache.org> on 2015/08/17 20:57:46 UTC

[jira] [Assigned] (SPARK-10038) TungstenProject code generation fails when applied to array

     [ https://issues.apache.org/jira/browse/SPARK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Davies Liu reassigned SPARK-10038:
----------------------------------

    Assignee: Davies Liu

> TungstenProject code generation fails when applied to array<binary>
> -------------------------------------------------------------------
>
>                 Key: SPARK-10038
>                 URL: https://issues.apache.org/jira/browse/SPARK-10038
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.0
>            Reporter: Josh Rosen
>            Assignee: Davies Liu
>            Priority: Blocker
>
> During fuzz testing, I discovered that TungstenProject can crash when applied to schemas that contain {{array<binary>}} columns.  As a minimal example, the following code crashes in spark-shell:
> {code}
> sc.parallelize(Seq((Array(Array[Byte](1)), 1))).toDF.select("_1").rdd.count()
> {code}
> Here's the stacktrace:
> {code}
> 15/08/16 17:11:49 ERROR Executor: Exception in task 3.0 in stage 29.0 (TID 144)
> java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: Line 53, Column 63: '{' expected instead of '['
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] exprs) {
>   return new SpecificUnsafeProjection(exprs);
> }
> class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private UnsafeRow convertedStruct2;
>   private byte[] buffer3;
>   private int cursor4;
>   private UnsafeArrayData convertedArray6;
>   private byte[] buffer7;
>   public SpecificUnsafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expressions) {
>     this.expressions = expressions;
>     this.convertedStruct2 = new UnsafeRow();
>     this.buffer3 = new byte[16];
>     this.cursor4 = 0;
>     convertedArray6 = new UnsafeArrayData();
>     buffer7 = new byte[64];
>   }
>   // Scala.Function1 need this
>   public Object apply(Object row) {
>     return apply((InternalRow) row);
>   }
>   public UnsafeRow apply(InternalRow i) {
>     cursor4 = 16;
>     convertedStruct2.pointTo(buffer3, Platform.BYTE_ARRAY_OFFSET, 1, cursor4);
>     /* input[0, ArrayType(BinaryType,true)] */
>     boolean isNull0 = i.isNullAt(0);
>     ArrayData primitive1 = isNull0 ? null : (i.getArray(0));
>     final boolean isNull8 = isNull0;
>     if (!isNull8) {
>       final ArrayData tmp9 = primitive1;
>       if (tmp9 instanceof UnsafeArrayData) {
>         convertedArray6 = (UnsafeArrayData) tmp9;
>       } else {
>         final int numElements10 = tmp9.numElements();
>         final int fixedSize11 = 4 * numElements10;
>         int numBytes12 = fixedSize11;
>         final byte[][] elements13 = new byte[][numElements10];
>         for (int index15 = 0; index15 < numElements10; index15++) {
>           if (!tmp9.isNullAt(index15)) {
>             elements13[index15] = tmp9.getBinary(index15);
>             numBytes12 += org.apache.spark.sql.catalyst.expressions.UnsafeWriters$BinaryWriter.getSize(elements13[index15]);
>           }
>         }
>         if (numBytes12 > buffer7.length) {
>           buffer7 = new byte[numBytes12];
>         }
>         int cursor14 = fixedSize11;
>         for (int index15 = 0; index15 < numElements10; index15++) {
>           if (elements13[index15] == null) {
>             // If element is null, write the negative value address into offset region.
>             Platform.putInt(buffer7, Platform.BYTE_ARRAY_OFFSET + 4 * index15, -cursor14);
>           } else {
>             Platform.putInt(buffer7, Platform.BYTE_ARRAY_OFFSET + 4 * index15, cursor14);
>             cursor14 += org.apache.spark.sql.catalyst.expressions.UnsafeWriters$BinaryWriter.write(
>               buffer7,
>               Platform.BYTE_ARRAY_OFFSET + cursor14,
>               elements13[index15]);
>           }
>         }
>         convertedArray6.pointTo(
>           buffer7,
>           Platform.BYTE_ARRAY_OFFSET,
>           numElements10,
>           numBytes12);
>       }
>     }
>     int numBytes16 = cursor4 + (isNull8 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$ArrayWriter.getSize(convertedArray6));
>     if (buffer3.length < numBytes16) {
>       // This will not happen frequently, because the buffer is re-used.
>       byte[] tmpBuffer5 = new byte[numBytes16 * 2];
>       Platform.copyMemory(buffer3, Platform.BYTE_ARRAY_OFFSET,
>         tmpBuffer5, Platform.BYTE_ARRAY_OFFSET, buffer3.length);
>       buffer3 = tmpBuffer5;
>     }
>     convertedStruct2.pointTo(buffer3, Platform.BYTE_ARRAY_OFFSET, 1, numBytes16);
>     if (isNull8) {
>       convertedStruct2.setNullAt(0);
>     } else {
>       cursor4 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$ArrayWriter.write(convertedStruct2, 0, cursor4, convertedArray6);
>     }
>     return convertedStruct2;
>   }
> }
> 	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
> 	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
> 	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
> 	at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
> 	at com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620)
> 	at com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362)
> 	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2251)
> 	at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
> 	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
> 	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:362)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:469)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
> 	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:425)
> 	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:124)
> 	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:134)
> 	at org.apache.spark.sql.execution.TungstenProject$$anonfun$2.apply(basicOperators.scala:85)
> 	at org.apache.spark.sql.execution.TungstenProject$$anonfun$2.apply(basicOperators.scala:80)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:88)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}
> Here's the {{explain}} output:
> {code}
> scala> sc.parallelize(Seq((Array(Array[Byte](1)), 1))).toDF.select("_1").explain(true)
> == Parsed Logical Plan ==
> 'Project [unresolvedalias('_1)]
>  LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22
> == Analyzed Logical Plan ==
> _1: array<binary>
> Project [_1#161]
>  LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22
> == Optimized Logical Plan ==
> Project [_1#161]
>  LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22
> == Physical Plan ==
> TungstenProject [_1#161]
>  Scan PhysicalRDD[_1#161,_2#162]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org