You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/04/23 12:28:33 UTC

spark git commit: [SPARK-23589][SQL] ExternalMapToCatalyst should support interpreted execution

Repository: spark
Updated Branches:
  refs/heads/master d87d30e4f -> afbdf4273


[SPARK-23589][SQL] ExternalMapToCatalyst should support interpreted execution

## What changes were proposed in this pull request?
This pr supported interpreted mode for `ExternalMapToCatalyst`.

## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #20980 from maropu/SPARK-23589.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afbdf427
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afbdf427
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afbdf427

Branch: refs/heads/master
Commit: afbdf427302aba858f95205ecef7667f412b2a6a
Parents: d87d30e
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Mon Apr 23 14:28:28 2018 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Mon Apr 23 14:28:28 2018 +0200

----------------------------------------------------------------------
 .../catalyst/expressions/objects/objects.scala  |  60 ++++++++++-
 .../expressions/ObjectExpressionsSuite.scala    | 108 ++++++++++++++++++-
 2 files changed, 165 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/afbdf427/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index f1ffcae..9c7e764 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -1255,8 +1255,64 @@ case class ExternalMapToCatalyst private(
   override def dataType: MapType = MapType(
     keyConverter.dataType, valueConverter.dataType, valueContainsNull = valueConverter.nullable)
 
-  override def eval(input: InternalRow): Any =
-    throw new UnsupportedOperationException("Only code-generated evaluation is supported")
+  private lazy val mapCatalystConverter: Any => (Array[Any], Array[Any]) = child.dataType match {
+    case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) =>
+      (input: Any) => {
+        val data = input.asInstanceOf[java.util.Map[Any, Any]]
+        val keys = new Array[Any](data.size)
+        val values = new Array[Any](data.size)
+        val iter = data.entrySet().iterator()
+        var i = 0
+        while (iter.hasNext) {
+          val entry = iter.next()
+          val (key, value) = (entry.getKey, entry.getValue)
+          keys(i) = if (key != null) {
+            keyConverter.eval(InternalRow.fromSeq(key :: Nil))
+          } else {
+            throw new RuntimeException("Cannot use null as map key!")
+          }
+          values(i) = if (value != null) {
+            valueConverter.eval(InternalRow.fromSeq(value :: Nil))
+          } else {
+            null
+          }
+          i += 1
+        }
+        (keys, values)
+      }
+
+    case ObjectType(cls) if classOf[scala.collection.Map[_, _]].isAssignableFrom(cls) =>
+      (input: Any) => {
+        val data = input.asInstanceOf[scala.collection.Map[Any, Any]]
+        val keys = new Array[Any](data.size)
+        val values = new Array[Any](data.size)
+        var i = 0
+        for ((key, value) <- data) {
+          keys(i) = if (key != null) {
+            keyConverter.eval(InternalRow.fromSeq(key :: Nil))
+          } else {
+            throw new RuntimeException("Cannot use null as map key!")
+          }
+          values(i) = if (value != null) {
+            valueConverter.eval(InternalRow.fromSeq(value :: Nil))
+          } else {
+            null
+          }
+          i += 1
+        }
+        (keys, values)
+      }
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val result = child.eval(input)
+    if (result != null) {
+      val (keys, values) = mapCatalystConverter(result)
+      new ArrayBasedMapData(new GenericArrayData(keys), new GenericArrayData(values))
+    } else {
+      null
+    }
+  }
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val inputMap = child.genCode(ctx)

http://git-wip-us.apache.org/repos/asf/spark/blob/afbdf427/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index 7136af8..730b36c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -21,12 +21,13 @@ import java.sql.{Date, Timestamp}
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
 import scala.util.Random
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.sql.{RandomDataGenerator, Row}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, JavaTypeInference, ScalaReflection}
 import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer, UnresolvedDeserializer}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.encoders._
@@ -501,6 +502,111 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       InternalRow.fromSeq(Seq(Row(1))),
       "java.lang.Integer is not a valid external type for schema of double")
   }
+
+  private def javaMapSerializerFor(
+      keyClazz: Class[_],
+      valueClazz: Class[_])(inputObject: Expression): Expression = {
+
+    def kvSerializerFor(inputObject: Expression, clazz: Class[_]): Expression = clazz match {
+      case c if c == classOf[java.lang.Integer] =>
+        Invoke(inputObject, "intValue", IntegerType)
+      case c if c == classOf[java.lang.String] =>
+        StaticInvoke(
+          classOf[UTF8String],
+          StringType,
+          "fromString",
+          inputObject :: Nil,
+          returnNullable = false)
+    }
+
+    ExternalMapToCatalyst(
+      inputObject,
+      ObjectType(keyClazz),
+      kvSerializerFor(_, keyClazz),
+      keyNullable = true,
+      ObjectType(valueClazz),
+      kvSerializerFor(_, valueClazz),
+      valueNullable = true
+    )
+  }
+
+  private def scalaMapSerializerFor[T: TypeTag, U: TypeTag](inputObject: Expression): Expression = {
+    import org.apache.spark.sql.catalyst.ScalaReflection._
+
+    val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+    def kvSerializerFor[V: TypeTag](inputObject: Expression): Expression =
+         localTypeOf[V].dealias match {
+       case t if t <:< localTypeOf[java.lang.Integer] =>
+         Invoke(inputObject, "intValue", IntegerType)
+       case t if t <:< localTypeOf[String] =>
+        StaticInvoke(
+          classOf[UTF8String],
+          StringType,
+          "fromString",
+          inputObject :: Nil,
+          returnNullable = false)
+       case _ =>
+         inputObject
+    }
+
+    ExternalMapToCatalyst(
+      inputObject,
+      dataTypeFor[T],
+      kvSerializerFor[T],
+      keyNullable = !localTypeOf[T].typeSymbol.asClass.isPrimitive,
+      dataTypeFor[U],
+      kvSerializerFor[U],
+      valueNullable = !localTypeOf[U].typeSymbol.asClass.isPrimitive
+    )
+  }
+
+  test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") {
+    // Simple test
+    val scalaMap = scala.collection.Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3")
+    val javaMap = new java.util.HashMap[java.lang.Integer, java.lang.String]() {
+      {
+        put(0, "v0")
+        put(1, "v1")
+        put(2, null)
+        put(3, "v3")
+      }
+    }
+    val expected = CatalystTypeConverters.convertToCatalyst(scalaMap)
+
+    // Java Map
+    val serializer1 = javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])(
+      Literal.fromObject(javaMap))
+    checkEvaluation(serializer1, expected)
+
+    // Scala Map
+    val serializer2 = scalaMapSerializerFor[Int, String](Literal.fromObject(scalaMap))
+    checkEvaluation(serializer2, expected)
+
+    // NULL key test
+    val scalaMapHasNullKey = scala.collection.Map[java.lang.Integer, String](
+      null.asInstanceOf[java.lang.Integer] -> "v0", new java.lang.Integer(1) -> "v1")
+    val javaMapHasNullKey = new java.util.HashMap[java.lang.Integer, java.lang.String]() {
+      {
+        put(null, "v0")
+        put(1, "v1")
+      }
+    }
+
+    // Java Map
+    val serializer3 =
+      javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])(
+        Literal.fromObject(javaMapHasNullKey))
+    checkExceptionInExpression[RuntimeException](
+      serializer3, EmptyRow, "Cannot use null as map key!")
+
+    // Scala Map
+    val serializer4 = scalaMapSerializerFor[java.lang.Integer, String](
+      Literal.fromObject(scalaMapHasNullKey))
+
+    checkExceptionInExpression[RuntimeException](
+      serializer4, EmptyRow, "Cannot use null as map key!")
+  }
 }
 
 class TestBean extends Serializable {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org