You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/08/25 12:16:30 UTC
spark git commit: [SPARK-17061][SPARK-17093][SQL][BACKPORT]
MapObjects should make copies of unsafe-backed data
Repository: spark
Updated Branches:
refs/heads/branch-2.0 88481ea21 -> 184e78b9d
[SPARK-17061][SPARK-17093][SQL][BACKPORT] MapObjects should make copies of unsafe-backed data
## What changes were proposed in this pull request?
This PR backports https://github.com/apache/spark/pull/14698 to branch-2.0.
See that PR for more details. All credit should go to lw-lin.
Author: Herman van Hovell <hv...@databricks.com>
Author: Liwei Lin <lw...@gmail.com>
Closes #14806 from hvanhovell/SPARK-17061.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/184e78b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/184e78b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/184e78b9
Branch: refs/heads/branch-2.0
Commit: 184e78b9d640259ba0720574de060841dc912872
Parents: 88481ea
Author: Liwei Lin <lw...@gmail.com>
Authored: Thu Aug 25 14:16:22 2016 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Aug 25 14:16:22 2016 +0200
----------------------------------------------------------------------
.../catalyst/expressions/objects/objects.scala | 12 ++++-
.../expressions/ExpressionEvalHelper.scala | 2 +-
.../expressions/ObjectExpressionSuite.scala | 56 ++++++++++++++++++++
3 files changed, 68 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/184e78b9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 37ec1a6..1cdda53 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -481,6 +481,16 @@ case class MapObjects private(
s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)"
}
+ // Make a copy of the data if it's unsafe-backed
+ def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
+ s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
+ val genFunctionValue = lambdaFunction.dataType match {
+ case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
+ case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
+ case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
+ case _ => genFunction.value
+ }
+
val loopNullCheck = inputDataType match {
case _: ArrayType => s"$loopIsNull = ${genInputData.value}.isNullAt($loopIndex);"
// The element of primitive array will never be null.
@@ -508,7 +518,7 @@ case class MapObjects private(
if (${genFunction.isNull}) {
$convertedArray[$loopIndex] = null;
} else {
- $convertedArray[$loopIndex] = ${genFunction.value};
+ $convertedArray[$loopIndex] = $genFunctionValue;
}
$loopIndex += 1;
http://git-wip-us.apache.org/repos/asf/spark/blob/184e78b9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index d6a9672..668543a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -136,7 +136,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
// some expression is reusing variable names across different instances.
// This behavior is tested in ExpressionEvalHelperSuite.
val plan = generateProject(
- GenerateUnsafeProjection.generate(
+ UnsafeProjection.create(
Alias(expression, s"Optimized($expression)1")() ::
Alias(expression, s"Optimized($expression)2")() :: Nil),
expression)
http://git-wip-us.apache.org/repos/asf/spark/blob/184e78b9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionSuite.scala
new file mode 100644
index 0000000..b6263e7
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+
+class ObjectExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
+ test("MapObjects should make copies of unsafe-backed data") {
+ // test UnsafeRow-backed data
+ val structEncoder = ExpressionEncoder[Array[(java.lang.Integer, java.lang.Integer)]]()
+ val structInputRow = InternalRow.fromSeq(Seq(Array((1, 2), (3, 4))))
+ val structExpected = new GenericArrayData(
+ Array(InternalRow.fromSeq(Seq(1, 2)), InternalRow.fromSeq(Seq(3, 4))))
+ checkEvalutionWithUnsafeProjection(
+ structEncoder.serializer.head, structExpected, structInputRow)
+
+ // test UnsafeArray-backed data
+ val arrayEncoder = ExpressionEncoder[Array[Array[Int]]]()
+ val arrayInputRow = InternalRow.fromSeq(Seq(Array(Array(1, 2), Array(3, 4))))
+ val arrayExpected = new GenericArrayData(
+ Array(new GenericArrayData(Array(1, 2)), new GenericArrayData(Array(3, 4))))
+ checkEvalutionWithUnsafeProjection(
+ arrayEncoder.serializer.head, arrayExpected, arrayInputRow)
+
+ // test UnsafeMap-backed data
+ val mapEncoder = ExpressionEncoder[Array[Map[Int, Int]]]()
+ val mapInputRow = InternalRow.fromSeq(Seq(Array(
+ Map(1 -> 100, 2 -> 200), Map(3 -> 300, 4 -> 400))))
+ val mapExpected = new GenericArrayData(Seq(
+ new ArrayBasedMapData(
+ new GenericArrayData(Array(1, 2)),
+ new GenericArrayData(Array(100, 200))),
+ new ArrayBasedMapData(
+ new GenericArrayData(Array(3, 4)),
+ new GenericArrayData(Array(300, 400)))))
+ checkEvalutionWithUnsafeProjection(
+ mapEncoder.serializer.head, mapExpected, mapInputRow)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org