You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Davies Liu (JIRA)" <ji...@apache.org> on 2015/08/17 20:57:46 UTC
[jira] [Assigned] (SPARK-10038) TungstenProject code generation
fails when applied to array
[ https://issues.apache.org/jira/browse/SPARK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Davies Liu reassigned SPARK-10038:
----------------------------------
Assignee: Davies Liu
> TungstenProject code generation fails when applied to array<binary>
> -------------------------------------------------------------------
>
> Key: SPARK-10038
> URL: https://issues.apache.org/jira/browse/SPARK-10038
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.5.0
> Reporter: Josh Rosen
> Assignee: Davies Liu
> Priority: Blocker
>
> During fuzz testing, I discovered that TungstenProject can crash when applied to schemas that contain {{array<binary>}} columns. As a minimal example, the following code crashes in spark-shell:
> {code}
> sc.parallelize(Seq((Array(Array[Byte](1)), 1))).toDF.select("_1").rdd.count()
> {code}
> Here's the stacktrace:
> {code}
> 15/08/16 17:11:49 ERROR Executor: Exception in task 3.0 in stage 29.0 (TID 144)
> java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: Line 53, Column 63: '{' expected instead of '['
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] exprs) {
> return new SpecificUnsafeProjection(exprs);
> }
> class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
> private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
> private UnsafeRow convertedStruct2;
> private byte[] buffer3;
> private int cursor4;
> private UnsafeArrayData convertedArray6;
> private byte[] buffer7;
> public SpecificUnsafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expressions) {
> this.expressions = expressions;
> this.convertedStruct2 = new UnsafeRow();
> this.buffer3 = new byte[16];
> this.cursor4 = 0;
> convertedArray6 = new UnsafeArrayData();
> buffer7 = new byte[64];
> }
> // Scala.Function1 need this
> public Object apply(Object row) {
> return apply((InternalRow) row);
> }
> public UnsafeRow apply(InternalRow i) {
> cursor4 = 16;
> convertedStruct2.pointTo(buffer3, Platform.BYTE_ARRAY_OFFSET, 1, cursor4);
> /* input[0, ArrayType(BinaryType,true)] */
> boolean isNull0 = i.isNullAt(0);
> ArrayData primitive1 = isNull0 ? null : (i.getArray(0));
> final boolean isNull8 = isNull0;
> if (!isNull8) {
> final ArrayData tmp9 = primitive1;
> if (tmp9 instanceof UnsafeArrayData) {
> convertedArray6 = (UnsafeArrayData) tmp9;
> } else {
> final int numElements10 = tmp9.numElements();
> final int fixedSize11 = 4 * numElements10;
> int numBytes12 = fixedSize11;
> final byte[][] elements13 = new byte[][numElements10];
> for (int index15 = 0; index15 < numElements10; index15++) {
> if (!tmp9.isNullAt(index15)) {
> elements13[index15] = tmp9.getBinary(index15);
> numBytes12 += org.apache.spark.sql.catalyst.expressions.UnsafeWriters$BinaryWriter.getSize(elements13[index15]);
> }
> }
> if (numBytes12 > buffer7.length) {
> buffer7 = new byte[numBytes12];
> }
> int cursor14 = fixedSize11;
> for (int index15 = 0; index15 < numElements10; index15++) {
> if (elements13[index15] == null) {
> // If element is null, write the negative value address into offset region.
> Platform.putInt(buffer7, Platform.BYTE_ARRAY_OFFSET + 4 * index15, -cursor14);
> } else {
> Platform.putInt(buffer7, Platform.BYTE_ARRAY_OFFSET + 4 * index15, cursor14);
> cursor14 += org.apache.spark.sql.catalyst.expressions.UnsafeWriters$BinaryWriter.write(
> buffer7,
> Platform.BYTE_ARRAY_OFFSET + cursor14,
> elements13[index15]);
> }
> }
> convertedArray6.pointTo(
> buffer7,
> Platform.BYTE_ARRAY_OFFSET,
> numElements10,
> numBytes12);
> }
> }
> int numBytes16 = cursor4 + (isNull8 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$ArrayWriter.getSize(convertedArray6));
> if (buffer3.length < numBytes16) {
> // This will not happen frequently, because the buffer is re-used.
> byte[] tmpBuffer5 = new byte[numBytes16 * 2];
> Platform.copyMemory(buffer3, Platform.BYTE_ARRAY_OFFSET,
> tmpBuffer5, Platform.BYTE_ARRAY_OFFSET, buffer3.length);
> buffer3 = tmpBuffer5;
> }
> convertedStruct2.pointTo(buffer3, Platform.BYTE_ARRAY_OFFSET, 1, numBytes16);
> if (isNull8) {
> convertedStruct2.setNullAt(0);
> } else {
> cursor4 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$ArrayWriter.write(convertedStruct2, 0, cursor4, convertedArray6);
> }
> return convertedStruct2;
> }
> }
> at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
> at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
> at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
> at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
> at com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620)
> at com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362)
> at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2251)
> at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
> at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
> at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:362)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:469)
> at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:425)
> at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:124)
> at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:134)
> at org.apache.spark.sql.execution.TungstenProject$$anonfun$2.apply(basicOperators.scala:85)
> at org.apache.spark.sql.execution.TungstenProject$$anonfun$2.apply(basicOperators.scala:80)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> Here's the {{explain}} output:
> {code}
> scala> sc.parallelize(Seq((Array(Array[Byte](1)), 1))).toDF.select("_1").explain(true)
> == Parsed Logical Plan ==
> 'Project [unresolvedalias('_1)]
> LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22
> == Analyzed Logical Plan ==
> _1: array<binary>
> Project [_1#161]
> LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22
> == Optimized Logical Plan ==
> Project [_1#161]
> LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22
> == Physical Plan ==
> TungstenProject [_1#161]
> Scan PhysicalRDD[_1#161,_2#162]
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org