You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2015/08/17 02:17:45 UTC

[jira] [Created] (SPARK-10038) TungstenProject code generation fails when applied to array

Josh Rosen created SPARK-10038:
----------------------------------

             Summary: TungstenProject code generation fails when applied to array<binary>
                 Key: SPARK-10038
                 URL: https://issues.apache.org/jira/browse/SPARK-10038
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.5.0
            Reporter: Josh Rosen


During fuzz testing, I discovered that TungstenProject can crash when applied to schemas that contain {{array<binary>}} columns.  As a minimal example, the following code crashes in spark-shell:

{code}
sc.parallelize(Seq((Array(Array[Byte](1)), 1))).toDF.select("_1").rdd.count()
{code}

Here's the stacktrace:

{code}
15/08/16 17:11:49 ERROR Executor: Exception in task 3.0 in stage 29.0 (TID 144)
java.util.concurrent.ExecutionException: java.lang.Exception: failed to compile: org.codehaus.commons.compiler.CompileException: Line 53, Column 63: '{' expected instead of '['

public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] exprs) {
  return new SpecificUnsafeProjection(exprs);
}

class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {

  private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;

  private UnsafeRow convertedStruct2;
  private byte[] buffer3;
  private int cursor4;
  private UnsafeArrayData convertedArray6;
  private byte[] buffer7;


  public SpecificUnsafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] expressions) {
    this.expressions = expressions;
    this.convertedStruct2 = new UnsafeRow();
    this.buffer3 = new byte[16];
    this.cursor4 = 0;
    convertedArray6 = new UnsafeArrayData();
    buffer7 = new byte[64];
  }

  // Scala.Function1 need this
  public Object apply(Object row) {
    return apply((InternalRow) row);
  }

  public UnsafeRow apply(InternalRow i) {

    cursor4 = 16;
    convertedStruct2.pointTo(buffer3, Platform.BYTE_ARRAY_OFFSET, 1, cursor4);


    /* input[0, ArrayType(BinaryType,true)] */

    boolean isNull0 = i.isNullAt(0);
    ArrayData primitive1 = isNull0 ? null : (i.getArray(0));

    final boolean isNull8 = isNull0;
    if (!isNull8) {
      final ArrayData tmp9 = primitive1;
      if (tmp9 instanceof UnsafeArrayData) {
        convertedArray6 = (UnsafeArrayData) tmp9;
      } else {
        final int numElements10 = tmp9.numElements();
        final int fixedSize11 = 4 * numElements10;
        int numBytes12 = fixedSize11;


        final byte[][] elements13 = new byte[][numElements10];
        for (int index15 = 0; index15 < numElements10; index15++) {

          if (!tmp9.isNullAt(index15)) {
            elements13[index15] = tmp9.getBinary(index15);
            numBytes12 += org.apache.spark.sql.catalyst.expressions.UnsafeWriters$BinaryWriter.getSize(elements13[index15]);
          }
        }


        if (numBytes12 > buffer7.length) {
          buffer7 = new byte[numBytes12];
        }

        int cursor14 = fixedSize11;
        for (int index15 = 0; index15 < numElements10; index15++) {
          if (elements13[index15] == null) {
            // If element is null, write the negative value address into offset region.
            Platform.putInt(buffer7, Platform.BYTE_ARRAY_OFFSET + 4 * index15, -cursor14);
          } else {
            Platform.putInt(buffer7, Platform.BYTE_ARRAY_OFFSET + 4 * index15, cursor14);

            cursor14 += org.apache.spark.sql.catalyst.expressions.UnsafeWriters$BinaryWriter.write(
              buffer7,
              Platform.BYTE_ARRAY_OFFSET + cursor14,
              elements13[index15]);

          }
        }

        convertedArray6.pointTo(
          buffer7,
          Platform.BYTE_ARRAY_OFFSET,
          numElements10,
          numBytes12);
      }
    }


    int numBytes16 = cursor4 + (isNull8 ? 0 : org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$ArrayWriter.getSize(convertedArray6));
    if (buffer3.length < numBytes16) {
      // This will not happen frequently, because the buffer is re-used.
      byte[] tmpBuffer5 = new byte[numBytes16 * 2];
      Platform.copyMemory(buffer3, Platform.BYTE_ARRAY_OFFSET,
        tmpBuffer5, Platform.BYTE_ARRAY_OFFSET, buffer3.length);
      buffer3 = tmpBuffer5;
    }
    convertedStruct2.pointTo(buffer3, Platform.BYTE_ARRAY_OFFSET, 1, numBytes16);


    if (isNull8) {
      convertedStruct2.setNullAt(0);
    } else {
      cursor4 += org.apache.spark.sql.catalyst.expressions.UnsafeRowWriters$ArrayWriter.write(convertedStruct2, 0, cursor4, convertedArray6);
    }



    return convertedStruct2;
  }
}


	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
	at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
	at com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3620)
	at com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2362)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2251)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
	at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
	at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.compile(CodeGenerator.scala:362)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:469)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:32)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:425)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:124)
	at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:134)
	at org.apache.spark.sql.execution.TungstenProject$$anonfun$2.apply(basicOperators.scala:85)
	at org.apache.spark.sql.execution.TungstenProject$$anonfun$2.apply(basicOperators.scala:80)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
{code}

Here's the {{explain}} output:

{code}
scala> sc.parallelize(Seq((Array(Array[Byte](1)), 1))).toDF.select("_1").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('_1)]
 LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22

== Analyzed Logical Plan ==
_1: array<binary>
Project [_1#161]
 LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22

== Optimized Logical Plan ==
Project [_1#161]
 LogicalRDD [_1#161,_2#162], MapPartitionsRDD[187] at rddToDataFrameHolder at <console>:22

== Physical Plan ==
TungstenProject [_1#161]
 Scan PhysicalRDD[_1#161,_2#162]
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org