You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/04/19 12:43:23 UTC

spark git commit: [SPARK-23588][SQL] CatalystToExternalMap should support interpreted execution

Repository: spark
Updated Branches:
  refs/heads/master 1b08c4393 -> e13416502


[SPARK-23588][SQL] CatalystToExternalMap should support interpreted execution

## What changes were proposed in this pull request?
This pr supported interpreted mode for `CatalystToExternalMap`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #20979 from maropu/SPARK-23588.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e1341650
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e1341650
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e1341650

Branch: refs/heads/master
Commit: e13416502f814b04d59bb650953a0114332d163a
Parents: 1b08c43
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Thu Apr 19 14:42:50 2018 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Apr 19 14:42:50 2018 +0200

----------------------------------------------------------------------
 .../catalyst/expressions/objects/objects.scala  | 39 ++++++++++++++++++--
 .../expressions/ObjectExpressionsSuite.scala    | 34 ++++++++++++++---
 2 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e1341650/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 1645bd7..bc17d12 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -28,12 +28,12 @@ import scala.util.Try
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.serializer._
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
 import org.apache.spark.sql.catalyst.ScalaReflection.universe.TermName
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -1033,8 +1033,39 @@ case class CatalystToExternalMap private(
   override def children: Seq[Expression] =
     keyLambdaFunction :: valueLambdaFunction :: inputData :: Nil
 
-  override def eval(input: InternalRow): Any =
-    throw new UnsupportedOperationException("Only code-generated evaluation is supported")
+  private lazy val inputMapType = inputData.dataType.asInstanceOf[MapType]
+
+  private lazy val keyConverter =
+    CatalystTypeConverters.createToScalaConverter(inputMapType.keyType)
+  private lazy val valueConverter =
+    CatalystTypeConverters.createToScalaConverter(inputMapType.valueType)
+
+  private def newMapBuilder(): Builder[AnyRef, AnyRef] = {
+    val clazz = Utils.classForName(collClass.getCanonicalName + "$")
+    val module = clazz.getField("MODULE$").get(null)
+    val method = clazz.getMethod("newBuilder")
+    method.invoke(module).asInstanceOf[Builder[AnyRef, AnyRef]]
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val result = inputData.eval(input).asInstanceOf[MapData]
+    if (result != null) {
+      val builder = newMapBuilder()
+      builder.sizeHint(result.numElements())
+      val keyArray = result.keyArray()
+      val valueArray = result.valueArray()
+      var i = 0
+      while (i < result.numElements()) {
+        val key = keyConverter(keyArray.get(i, inputMapType.keyType))
+        val value = valueConverter(valueArray.get(i, inputMapType.valueType))
+        builder += Tuple2(key, value)
+        i += 1
+      }
+      builder.result()
+    } else {
+      null
+    }
+  }
 
   override def dataType: DataType = ObjectType(collClass)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e1341650/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index bf805f4..bcd035c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -27,12 +27,14 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.ResolveTimeZone
-import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer, UnresolvedDeserializer}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.expressions.objects._
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{SQLDate, SQLTimestamp}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -162,9 +164,10 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
         "fromPrimitiveArray", ObjectType(classOf[Array[Int]]),
         Array[Int](1, 2, 3), UnsafeArrayData.fromPrimitiveArray(Array[Int](1, 2, 3))),
       (DateTimeUtils.getClass, ObjectType(classOf[Date]),
-        "toJavaDate", ObjectType(classOf[SQLDate]), 77777, DateTimeUtils.toJavaDate(77777)),
+        "toJavaDate", ObjectType(classOf[DateTimeUtils.SQLDate]), 77777,
+        DateTimeUtils.toJavaDate(77777)),
       (DateTimeUtils.getClass, ObjectType(classOf[Timestamp]),
-        "toJavaTimestamp", ObjectType(classOf[SQLTimestamp]),
+        "toJavaTimestamp", ObjectType(classOf[DateTimeUtils.SQLTimestamp]),
         88888888.toLong, DateTimeUtils.toJavaTimestamp(88888888))
     ).foreach { case (cls, dataType, methodName, argType, arg, expected) =>
       checkObjectExprEvaluation(StaticInvoke(cls, dataType, methodName,
@@ -450,6 +453,25 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       }
     }
   }
+
+  implicit private def mapIntStrEncoder = ExpressionEncoder[Map[Int, String]]()
+
+  test("SPARK-23588 CatalystToExternalMap should support interpreted execution") {
+    // To get a resolved `CatalystToExternalMap` expression, we build a deserializer plan
+    // with dummy input, resolve the plan by the analyzer, and replace the dummy input
+    // with a literal for tests.
+    val unresolvedDeser = UnresolvedDeserializer(encoderFor[Map[Int, String]].deserializer)
+    val dummyInputPlan = LocalRelation('value.map(MapType(IntegerType, StringType)))
+    val plan = Project(Alias(unresolvedDeser, "none")() :: Nil, dummyInputPlan)
+
+    val analyzedPlan = SimpleAnalyzer.execute(plan)
+    val Alias(toMapExpr: CatalystToExternalMap, _) = analyzedPlan.expressions.head
+
+    // Replaces the dummy input with a literal for tests here
+    val data = Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3")
+    val deserializer = toMapExpr.copy(inputData = Literal.create(data))
+    checkObjectExprEvaluation(deserializer, expected = data)
+  }
 }
 
 class TestBean extends Serializable {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org