You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/04/19 12:43:23 UTC
spark git commit: [SPARK-23588][SQL] CatalystToExternalMap should
support interpreted execution
Repository: spark
Updated Branches:
refs/heads/master 1b08c4393 -> e13416502
[SPARK-23588][SQL] CatalystToExternalMap should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `CatalystToExternalMap`.
## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.
Author: Takeshi Yamamuro <ya...@apache.org>
Closes #20979 from maropu/SPARK-23588.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1341650
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1341650
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1341650
Branch: refs/heads/master
Commit: e13416502f814b04d59bb650953a0114332d163a
Parents: 1b08c43
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Thu Apr 19 14:42:50 2018 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Apr 19 14:42:50 2018 +0200
----------------------------------------------------------------------
.../catalyst/expressions/objects/objects.scala | 39 ++++++++++++++++++--
.../expressions/ObjectExpressionsSuite.scala | 34 ++++++++++++++---
2 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e1341650/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 1645bd7..bc17d12 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -28,12 +28,12 @@ import scala.util.Try
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.serializer._
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.ScalaReflection.universe.TermName
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -1033,8 +1033,39 @@ case class CatalystToExternalMap private(
override def children: Seq[Expression] =
keyLambdaFunction :: valueLambdaFunction :: inputData :: Nil
- override def eval(input: InternalRow): Any =
- throw new UnsupportedOperationException("Only code-generated evaluation is supported")
+ private lazy val inputMapType = inputData.dataType.asInstanceOf[MapType]
+
+ private lazy val keyConverter =
+ CatalystTypeConverters.createToScalaConverter(inputMapType.keyType)
+ private lazy val valueConverter =
+ CatalystTypeConverters.createToScalaConverter(inputMapType.valueType)
+
+ private def newMapBuilder(): Builder[AnyRef, AnyRef] = {
+ val clazz = Utils.classForName(collClass.getCanonicalName + "$")
+ val module = clazz.getField("MODULE$").get(null)
+ val method = clazz.getMethod("newBuilder")
+ method.invoke(module).asInstanceOf[Builder[AnyRef, AnyRef]]
+ }
+
+ override def eval(input: InternalRow): Any = {
+ val result = inputData.eval(input).asInstanceOf[MapData]
+ if (result != null) {
+ val builder = newMapBuilder()
+ builder.sizeHint(result.numElements())
+ val keyArray = result.keyArray()
+ val valueArray = result.valueArray()
+ var i = 0
+ while (i < result.numElements()) {
+ val key = keyConverter(keyArray.get(i, inputMapType.keyType))
+ val value = valueConverter(valueArray.get(i, inputMapType.valueType))
+ builder += Tuple2(key, value)
+ i += 1
+ }
+ builder.result()
+ } else {
+ null
+ }
+ }
override def dataType: DataType = ObjectType(collClass)
http://git-wip-us.apache.org/repos/asf/spark/blob/e1341650/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index bf805f4..bcd035c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -27,12 +27,14 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
-import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer, UnresolvedDeserializer}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.objects._
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{SQLDate, SQLTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -162,9 +164,10 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
"fromPrimitiveArray", ObjectType(classOf[Array[Int]]),
Array[Int](1, 2, 3), UnsafeArrayData.fromPrimitiveArray(Array[Int](1, 2, 3))),
(DateTimeUtils.getClass, ObjectType(classOf[Date]),
- "toJavaDate", ObjectType(classOf[SQLDate]), 77777, DateTimeUtils.toJavaDate(77777)),
+ "toJavaDate", ObjectType(classOf[DateTimeUtils.SQLDate]), 77777,
+ DateTimeUtils.toJavaDate(77777)),
(DateTimeUtils.getClass, ObjectType(classOf[Timestamp]),
- "toJavaTimestamp", ObjectType(classOf[SQLTimestamp]),
+ "toJavaTimestamp", ObjectType(classOf[DateTimeUtils.SQLTimestamp]),
88888888.toLong, DateTimeUtils.toJavaTimestamp(88888888))
).foreach { case (cls, dataType, methodName, argType, arg, expected) =>
checkObjectExprEvaluation(StaticInvoke(cls, dataType, methodName,
@@ -450,6 +453,25 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}
}
+
+ implicit private def mapIntStrEncoder = ExpressionEncoder[Map[Int, String]]()
+
+ test("SPARK-23588 CatalystToExternalMap should support interpreted execution") {
+ // To get a resolved `CatalystToExternalMap` expression, we build a deserializer plan
+ // with dummy input, resolve the plan by the analyzer, and replace the dummy input
+ // with a literal for tests.
+ val unresolvedDeser = UnresolvedDeserializer(encoderFor[Map[Int, String]].deserializer)
+ val dummyInputPlan = LocalRelation('value.map(MapType(IntegerType, StringType)))
+ val plan = Project(Alias(unresolvedDeser, "none")() :: Nil, dummyInputPlan)
+
+ val analyzedPlan = SimpleAnalyzer.execute(plan)
+ val Alias(toMapExpr: CatalystToExternalMap, _) = analyzedPlan.expressions.head
+
+ // Replaces the dummy input with a literal for tests here
+ val data = Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3")
+ val deserializer = toMapExpr.copy(inputData = Literal.create(data))
+ checkObjectExprEvaluation(deserializer, expected = data)
+ }
}
class TestBean extends Serializable {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org