You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/08/25 09:24:56 UTC

spark git commit: [SPARK-17061][SPARK-17093][SQL] MapObjects` should make copies of unsafe-backed data

Repository: spark
Updated Branches:
  refs/heads/master 2bcd5d5ce -> e0b20f9f2


[SPARK-17061][SPARK-17093][SQL] MapObjects` should make copies of unsafe-backed data

## What changes were proposed in this pull request?

Currently `MapObjects` does not make copies of unsafe-backed data, leading to problems like [SPARK-17061](https://issues.apache.org/jira/browse/SPARK-17061) [SPARK-17093](https://issues.apache.org/jira/browse/SPARK-17093).

This patch makes `MapObjects` make copies of unsafe-backed data.

Generated code - prior to this patch:
```java
...
/* 295 */ if (isNull12) {
/* 296 */   convertedArray1[loopIndex1] = null;
/* 297 */ } else {
/* 298 */   convertedArray1[loopIndex1] = value12;
/* 299 */ }
...
```

Generated code - after this patch:
```java
...
/* 295 */ if (isNull12) {
/* 296 */   convertedArray1[loopIndex1] = null;
/* 297 */ } else {
/* 298 */   convertedArray1[loopIndex1] = value12 instanceof UnsafeRow? value12.copy() : value12;
/* 299 */ }
...
```

## How was this patch tested?

Add a new test case which would fail without this patch.

Author: Liwei Lin <lw...@gmail.com>

Closes #14698 from lw-lin/mapobjects-copy.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0b20f9f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0b20f9f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0b20f9f

Branch: refs/heads/master
Commit: e0b20f9f24d5c3304bf517a4dcfb0da93be5bc75
Parents: 2bcd5d5
Author: Liwei Lin <lw...@gmail.com>
Authored: Thu Aug 25 11:24:40 2016 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Aug 25 11:24:40 2016 +0200

----------------------------------------------------------------------
 .../catalyst/expressions/objects/objects.scala  | 12 ++++++-
 .../expressions/ExpressionEvalHelper.scala      |  2 +-
 .../expressions/ObjectExpressionsSuite.scala    | 34 ++++++++++++++++++++
 3 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0b20f9f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 31ed485..4da74a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -494,6 +494,16 @@ case class MapObjects private(
           s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)"
     }
 
+    // Make a copy of the data if it's unsafe-backed
+    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
+      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
+    val genFunctionValue = lambdaFunction.dataType match {
+      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
+      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
+      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
+      case _ => genFunction.value
+    }
+
     val loopNullCheck = inputDataType match {
       case _: ArrayType => s"$loopIsNull = ${genInputData.value}.isNullAt($loopIndex);"
       // The element of primitive array will never be null.
@@ -521,7 +531,7 @@ case class MapObjects private(
           if (${genFunction.isNull}) {
             $convertedArray[$loopIndex] = null;
           } else {
-            $convertedArray[$loopIndex] = ${genFunction.value};
+            $convertedArray[$loopIndex] = $genFunctionValue;
           }
 
           $loopIndex += 1;

http://git-wip-us.apache.org/repos/asf/spark/blob/e0b20f9f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index d6a9672..668543a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -136,7 +136,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
     // some expression is reusing variable names across different instances.
     // This behavior is tested in ExpressionEvalHelperSuite.
     val plan = generateProject(
-      GenerateUnsafeProjection.generate(
+      UnsafeProjection.create(
         Alias(expression, s"Optimized($expression)1")() ::
           Alias(expression, s"Optimized($expression)2")() :: Nil),
       expression)

http://git-wip-us.apache.org/repos/asf/spark/blob/e0b20f9f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index ee65826..3edcc02 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.objects.Invoke
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
 import org.apache.spark.sql.types.{IntegerType, ObjectType}
 
 
@@ -32,4 +34,36 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     val invoke = Invoke(inputObject, "_2", IntegerType)
     checkEvaluationWithGeneratedMutableProjection(invoke, null, inputRow)
   }
+
+  test("MapObjects should make copies of unsafe-backed data") {
+    // test UnsafeRow-backed data
+    val structEncoder = ExpressionEncoder[Array[Tuple2[java.lang.Integer, java.lang.Integer]]]
+    val structInputRow = InternalRow.fromSeq(Seq(Array((1, 2), (3, 4))))
+    val structExpected = new GenericArrayData(
+      Array(InternalRow.fromSeq(Seq(1, 2)), InternalRow.fromSeq(Seq(3, 4))))
+    checkEvalutionWithUnsafeProjection(
+      structEncoder.serializer.head, structExpected, structInputRow)
+
+    // test UnsafeArray-backed data
+    val arrayEncoder = ExpressionEncoder[Array[Array[Int]]]
+    val arrayInputRow = InternalRow.fromSeq(Seq(Array(Array(1, 2), Array(3, 4))))
+    val arrayExpected = new GenericArrayData(
+      Array(new GenericArrayData(Array(1, 2)), new GenericArrayData(Array(3, 4))))
+    checkEvalutionWithUnsafeProjection(
+      arrayEncoder.serializer.head, arrayExpected, arrayInputRow)
+
+    // test UnsafeMap-backed data
+    val mapEncoder = ExpressionEncoder[Array[Map[Int, Int]]]
+    val mapInputRow = InternalRow.fromSeq(Seq(Array(
+      Map(1 -> 100, 2 -> 200), Map(3 -> 300, 4 -> 400))))
+    val mapExpected = new GenericArrayData(Seq(
+      new ArrayBasedMapData(
+        new GenericArrayData(Array(1, 2)),
+        new GenericArrayData(Array(100, 200))),
+      new ArrayBasedMapData(
+        new GenericArrayData(Array(3, 4)),
+        new GenericArrayData(Array(300, 400)))))
+    checkEvalutionWithUnsafeProjection(
+      mapEncoder.serializer.head, mapExpected, mapInputRow)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org