You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2015/08/17 02:17:45 UTC
[jira] [Created] (SPARK-10038) TungstenProject code generation
fails when applied to array
Josh Rosen created SPARK-10038:
----------------------------------
Summary: TungstenProject code generation fails when applied to array<binary>
Key: SPARK-10038
URL: https://issues.apache.org/jira/browse/SPARK-10038
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 1.5.0
Reporter: Josh Rosen
During fuzz testing, I discovered that TungstenProject can crash when applied to schemas that contain {{array<binary>}} columns. As a minimal example, the following code crashes in spark-shell:
{code}
sc.parallelize(Seq((Array(Array[Byte](1)), 1))).toDF.select("_1").rdd.count()
{code}
Here's the stacktrace:
{code}
15/08/16 17:11:49 ERROR Executor: Exception in task 3.0 in stage 29.0 (TID 144)
java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: Line 53, Column 63: '{' expected instead of '['
public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] exprs) {
return new SpecificUnsafeProjection(exprs);
}
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
private UnsafeRow convertedStruct2;
private byte[] buffer3;
private int cursor4;
private UnsafeArrayData convertedArray6;
private byte[] buffer7;
public SpecificUnsafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expressions) {
this.expressions = expressions;
this.convertedStruct2 = new UnsafeRow();
this.buffer3 = new byte[16];
this.cursor4 = 0;
convertedArray6 = new UnsafeArrayData();
buffer7 = new byte[64];
}
// Scala.Function1 need this
public Object apply(Object row) {
return apply((InternalRow) row);
}
public UnsafeRow apply(InternalRow i) {
cursor4 = 16;
convertedStruct2.pointTo(buffer3, Platform.BYTE_ARRAY_OFFSET, 1, cursor4);
/* input[0, ArrayType(BinaryType,true)] */
boolean isNull0 = i.isNullAt(0);
ArrayData primitive1 = isNull0 ? null : (i.getArray(0));
final boolean isNull8 = isNull0;
if (!isNull8) {
final ArrayData tmp9 = primitive1;
if (tmp9 instanceof UnsafeArrayData) {
convertedArray6 = (UnsafeArrayData) tmp9;
} else {
final int numElements10 = tmp9.numElements();
final int fixedSize11 = 4 * numElements10;
int numBytes12 = fixedSize11;
final byte[][] elements13 = new byte[][numElements10];
for (int index15 = 0; index15 < numElements10; index15++) {
if (!tmp9.isNullAt(index15)) {
elements13[index15] = tmp9.getBinary(index15);
numBytes12 += org.apache.spark.sql.catalyst.expressions.UnsafeWriters$BinaryWriter.getSize(elements13[index15]);
}
}
if (numBytes12 > buffer7.length) {
buffer7 = new byte[numBytes12];
}
int cursor14 = fixedSize11;
for (int index15 = 0; index15 < numElements10; index15++) {
if (elements13[index15] == null) {
// If element is null, write the negative value address into offset region.
Platform.putInt(buffer7, Platform.BYTE_ARRAY_OFFSET + 4 * index15, -cursor14);
} else {
Platform.putInt(buffer7, Platform.BYTE_ARRAY_OFFSET + 4 * index15, cursor14);
cursor14 += org.apache.spark.sql.catalyst.expressions.UnsafeWriters$BinaryWriter.write(
buffer7,
Platform.BYTE_ARRAY_OFFSET + cursor14,
elements13[index15]);
}
}
convertedArray6.pointTo(
buffer7,
Platform.BYTE_ARRAY_OFFSET,
numElements10,
numBytes12);
}
}
int numBytes16 = cursor4 + (isNull8 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$ArrayWriter.getSize(convertedArray6));
if (buffer3.length < numBytes16) {
// This will not happen frequently, because the buffer is re-used.
byte[] tmpBuffer5 = new byte[numBytes16 * 2];
Platform.copyMemory(buffer3, Platform.BYTE_ARRAY_OFFSET,
tmpBuffer5, Platform.BYTE_ARRAY_OFFSET, buffer3.length);
buffer3 = tmpBuffer5;
}
convertedStruct2.pointTo(buffer3, Platform.BYTE_ARRAY_OFFSET, 1, numBytes16);
if (isNull8) {
convertedStruct2.setNullAt(0);
} else {
cursor4 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$ArrayWriter.write(convertedStruct2, 0, cursor4, convertedArray6);
}
return convertedStruct2;
}
}
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
at com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620)
at com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2251)
at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:362)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:469)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:425)
at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:124)
at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:134)
at org.apache.spark.sql.execution.TungstenProject$$anonfun$2.apply(basicOperators.scala:85)
at org.apache.spark.sql.execution.TungstenProject$$anonfun$2.apply(basicOperators.scala:80)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
{code}
Here's the {{explain}} output:
{code}
scala> sc.parallelize(Seq((Array(Array[Byte](1)), 1))).toDF.select("_1").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('_1)]
LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22
== Analyzed Logical Plan ==
_1: array<binary>
Project [_1#161]
LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22
== Optimized Logical Plan ==
Project [_1#161]
LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22
== Physical Plan ==
TungstenProject [_1#161]
Scan PhysicalRDD[_1#161,_2#162]
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org