You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/04/23 12:28:33 UTC
spark git commit: [SPARK-23589][SQL] ExternalMapToCatalyst should
support interpreted execution
Repository: spark
Updated Branches:
refs/heads/master d87d30e4f -> afbdf4273
[SPARK-23589][SQL] ExternalMapToCatalyst should support interpreted execution
## What changes were proposed in this pull request?
This pr supported interpreted mode for `ExternalMapToCatalyst`.
## How was this patch tested?
Added tests in `ObjectExpressionsSuite`.
Author: Takeshi Yamamuro <ya...@apache.org>
Closes #20980 from maropu/SPARK-23589.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afbdf427
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afbdf427
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afbdf427
Branch: refs/heads/master
Commit: afbdf427302aba858f95205ecef7667f412b2a6a
Parents: d87d30e
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Mon Apr 23 14:28:28 2018 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Mon Apr 23 14:28:28 2018 +0200
----------------------------------------------------------------------
.../catalyst/expressions/objects/objects.scala | 60 ++++++++++-
.../expressions/ObjectExpressionsSuite.scala | 108 ++++++++++++++++++-
2 files changed, 165 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/afbdf427/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index f1ffcae..9c7e764 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -1255,8 +1255,64 @@ case class ExternalMapToCatalyst private(
override def dataType: MapType = MapType(
keyConverter.dataType, valueConverter.dataType, valueContainsNull = valueConverter.nullable)
- override def eval(input: InternalRow): Any =
- throw new UnsupportedOperationException("Only code-generated evaluation is supported")
+ private lazy val mapCatalystConverter: Any => (Array[Any], Array[Any]) = child.dataType match {
+ case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) =>
+ (input: Any) => {
+ val data = input.asInstanceOf[java.util.Map[Any, Any]]
+ val keys = new Array[Any](data.size)
+ val values = new Array[Any](data.size)
+ val iter = data.entrySet().iterator()
+ var i = 0
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val (key, value) = (entry.getKey, entry.getValue)
+ keys(i) = if (key != null) {
+ keyConverter.eval(InternalRow.fromSeq(key :: Nil))
+ } else {
+ throw new RuntimeException("Cannot use null as map key!")
+ }
+ values(i) = if (value != null) {
+ valueConverter.eval(InternalRow.fromSeq(value :: Nil))
+ } else {
+ null
+ }
+ i += 1
+ }
+ (keys, values)
+ }
+
+ case ObjectType(cls) if classOf[scala.collection.Map[_, _]].isAssignableFrom(cls) =>
+ (input: Any) => {
+ val data = input.asInstanceOf[scala.collection.Map[Any, Any]]
+ val keys = new Array[Any](data.size)
+ val values = new Array[Any](data.size)
+ var i = 0
+ for ((key, value) <- data) {
+ keys(i) = if (key != null) {
+ keyConverter.eval(InternalRow.fromSeq(key :: Nil))
+ } else {
+ throw new RuntimeException("Cannot use null as map key!")
+ }
+ values(i) = if (value != null) {
+ valueConverter.eval(InternalRow.fromSeq(value :: Nil))
+ } else {
+ null
+ }
+ i += 1
+ }
+ (keys, values)
+ }
+ }
+
+ override def eval(input: InternalRow): Any = {
+ val result = child.eval(input)
+ if (result != null) {
+ val (keys, values) = mapCatalystConverter(result)
+ new ArrayBasedMapData(new GenericArrayData(keys), new GenericArrayData(values))
+ } else {
+ null
+ }
+ }
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val inputMap = child.genCode(ctx)
http://git-wip-us.apache.org/repos/asf/spark/blob/afbdf427/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index 7136af8..730b36c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -21,12 +21,13 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
import scala.util.Random
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.sql.{RandomDataGenerator, Row}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, JavaTypeInference, ScalaReflection}
import org.apache.spark.sql.catalyst.analysis.{ResolveTimeZone, SimpleAnalyzer, UnresolvedDeserializer}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.encoders._
@@ -501,6 +502,111 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
InternalRow.fromSeq(Seq(Row(1))),
"java.lang.Integer is not a valid external type for schema of double")
}
+
+ private def javaMapSerializerFor(
+ keyClazz: Class[_],
+ valueClazz: Class[_])(inputObject: Expression): Expression = {
+
+ def kvSerializerFor(inputObject: Expression, clazz: Class[_]): Expression = clazz match {
+ case c if c == classOf[java.lang.Integer] =>
+ Invoke(inputObject, "intValue", IntegerType)
+ case c if c == classOf[java.lang.String] =>
+ StaticInvoke(
+ classOf[UTF8String],
+ StringType,
+ "fromString",
+ inputObject :: Nil,
+ returnNullable = false)
+ }
+
+ ExternalMapToCatalyst(
+ inputObject,
+ ObjectType(keyClazz),
+ kvSerializerFor(_, keyClazz),
+ keyNullable = true,
+ ObjectType(valueClazz),
+ kvSerializerFor(_, valueClazz),
+ valueNullable = true
+ )
+ }
+
+ private def scalaMapSerializerFor[T: TypeTag, U: TypeTag](inputObject: Expression): Expression = {
+ import org.apache.spark.sql.catalyst.ScalaReflection._
+
+ val curId = new java.util.concurrent.atomic.AtomicInteger()
+
+ def kvSerializerFor[V: TypeTag](inputObject: Expression): Expression =
+ localTypeOf[V].dealias match {
+ case t if t <:< localTypeOf[java.lang.Integer] =>
+ Invoke(inputObject, "intValue", IntegerType)
+ case t if t <:< localTypeOf[String] =>
+ StaticInvoke(
+ classOf[UTF8String],
+ StringType,
+ "fromString",
+ inputObject :: Nil,
+ returnNullable = false)
+ case _ =>
+ inputObject
+ }
+
+ ExternalMapToCatalyst(
+ inputObject,
+ dataTypeFor[T],
+ kvSerializerFor[T],
+ keyNullable = !localTypeOf[T].typeSymbol.asClass.isPrimitive,
+ dataTypeFor[U],
+ kvSerializerFor[U],
+ valueNullable = !localTypeOf[U].typeSymbol.asClass.isPrimitive
+ )
+ }
+
+ test("SPARK-23589 ExternalMapToCatalyst should support interpreted execution") {
+ // Simple test
+ val scalaMap = scala.collection.Map[Int, String](0 -> "v0", 1 -> "v1", 2 -> null, 3 -> "v3")
+ val javaMap = new java.util.HashMap[java.lang.Integer, java.lang.String]() {
+ {
+ put(0, "v0")
+ put(1, "v1")
+ put(2, null)
+ put(3, "v3")
+ }
+ }
+ val expected = CatalystTypeConverters.convertToCatalyst(scalaMap)
+
+ // Java Map
+ val serializer1 = javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])(
+ Literal.fromObject(javaMap))
+ checkEvaluation(serializer1, expected)
+
+ // Scala Map
+ val serializer2 = scalaMapSerializerFor[Int, String](Literal.fromObject(scalaMap))
+ checkEvaluation(serializer2, expected)
+
+ // NULL key test
+ val scalaMapHasNullKey = scala.collection.Map[java.lang.Integer, String](
+ null.asInstanceOf[java.lang.Integer] -> "v0", new java.lang.Integer(1) -> "v1")
+ val javaMapHasNullKey = new java.util.HashMap[java.lang.Integer, java.lang.String]() {
+ {
+ put(null, "v0")
+ put(1, "v1")
+ }
+ }
+
+ // Java Map
+ val serializer3 =
+ javaMapSerializerFor(classOf[java.lang.Integer], classOf[java.lang.String])(
+ Literal.fromObject(javaMapHasNullKey))
+ checkExceptionInExpression[RuntimeException](
+ serializer3, EmptyRow, "Cannot use null as map key!")
+
+ // Scala Map
+ val serializer4 = scalaMapSerializerFor[java.lang.Integer, String](
+ Literal.fromObject(scalaMapHasNullKey))
+
+ checkExceptionInExpression[RuntimeException](
+ serializer4, EmptyRow, "Cannot use null as map key!")
+ }
}
class TestBean extends Serializable {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org