You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/04/05 11:39:50 UTC

spark git commit: [SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean expression

Repository: spark
Updated Branches:
  refs/heads/master d3bd0435e -> c5c8b5440


[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean expression

## What changes were proposed in this pull request?

Add interpreted execution for `InitializeJavaBean` expression.

## How was this patch tested?

Added unit test.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #20756 from viirya/SPARK-23593.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5c8b544
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5c8b544
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5c8b544

Branch: refs/heads/master
Commit: c5c8b544047a83cb6128a20d31f1d943a15f9260
Parents: d3bd043
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Thu Apr 5 13:39:45 2018 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Apr 5 13:39:45 2018 +0200

----------------------------------------------------------------------
 .../catalyst/expressions/objects/objects.scala  | 47 +++++++++++++++++-
 .../expressions/ExpressionEvalHelper.scala      |  9 ++--
 .../expressions/ObjectExpressionsSuite.scala    | 52 ++++++++++++++++++++
 3 files changed, 103 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c5c8b544/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index a455c1c..20c4f4c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -1410,8 +1410,47 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-    throw new UnsupportedOperationException("Only code-generated evaluation is supported.")
+  private lazy val resolvedSetters = {
+    assert(beanInstance.dataType.isInstanceOf[ObjectType])
+
+    val ObjectType(beanClass) = beanInstance.dataType
+    setters.map {
+      case (name, expr) =>
+        // Looking for known type mapping.
+        // But also looking for general `Object`-type parameter for generic methods.
+        val paramTypes = ScalaReflection.expressionJavaClasses(Seq(expr)) ++ Seq(classOf[Object])
+        val methods = paramTypes.flatMap { fieldClass =>
+          try {
+            Some(beanClass.getDeclaredMethod(name, fieldClass))
+          } catch {
+            case e: NoSuchMethodException => None
+          }
+        }
+        if (methods.isEmpty) {
+          throw new NoSuchMethodException(s"""A method named "$name" is not declared """ +
+            "in any enclosing class nor any supertype")
+        }
+        methods.head -> expr
+    }
+  }
+
+  override def eval(input: InternalRow): Any = {
+    val instance = beanInstance.eval(input)
+    if (instance != null) {
+      val bean = instance.asInstanceOf[Object]
+      resolvedSetters.foreach {
+        case (setter, expr) =>
+          val paramVal = expr.eval(input)
+          if (paramVal == null) {
+            throw new NullPointerException("The parameter value for setters in " +
+              "`InitializeJavaBean` can not be null")
+          } else {
+            setter.invoke(bean, paramVal.asInstanceOf[AnyRef])
+          }
+      }
+    }
+    instance
+  }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val instanceGen = beanInstance.genCode(ctx)
@@ -1424,6 +1463,10 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
         val fieldGen = fieldValue.genCode(ctx)
         s"""
            |${fieldGen.code}
+           |if (${fieldGen.isNull}) {
+           |  throw new NullPointerException("The parameter value for setters in " +
+           |    "`InitializeJavaBean` can not be null");
+           |}
            |$javaBeanInstance.$setterMethod(${fieldGen.value});
          """.stripMargin
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c5c8b544/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 3828f17..a5ecd1b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -55,7 +55,8 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
 
   protected def checkEvaluation(
       expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
-    val expr = prepareEvaluation(expression)
+    // Make it as method to obtain fresh expression everytime.
+    def expr = prepareEvaluation(expression)
     val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
     checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
     checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow)
@@ -111,12 +112,14 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
         val errMsg = intercept[T] {
           eval
         }.getMessage
-        if (errMsg != expectedErrMsg) {
+        if (!errMsg.contains(expectedErrMsg)) {
           fail(s"Expected error message is `$expectedErrMsg`, but `$errMsg` found")
         }
       }
     }
-    val expr = prepareEvaluation(expression)
+
+    // Make it as method to obtain fresh expression everytime.
+    def expr = prepareEvaluation(expression)
     checkException(evaluateWithoutCodegen(expr, inputRow), "non-codegen mode")
     checkException(evaluateWithGeneratedMutableProjection(expr, inputRow), "codegen mode")
     if (GenerateUnsafeProjection.canSupport(expr.dataType)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c5c8b544/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index 9bfe291..44fecd6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -128,6 +128,50 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       Invoke(funcSubObj, "binOp", DoubleType, inputSum), 0.75, InternalRow.apply(1, 0.25))
   }
 
+  test("SPARK-23593: InitializeJavaBean should support interpreted execution") {
+    val list = new java.util.LinkedList[Int]()
+    list.add(1)
+
+    val initializeBean = InitializeJavaBean(Literal.fromObject(new java.util.LinkedList[Int]),
+      Map("add" -> Literal(1)))
+    checkEvaluation(initializeBean, list, InternalRow.fromSeq(Seq()))
+
+    val initializeWithNonexistingMethod = InitializeJavaBean(
+      Literal.fromObject(new java.util.LinkedList[Int]),
+      Map("nonexisting" -> Literal(1)))
+    checkExceptionInExpression[Exception](initializeWithNonexistingMethod,
+      InternalRow.fromSeq(Seq()),
+      """A method named "nonexisting" is not declared in any enclosing class """ +
+        "nor any supertype")
+
+    val initializeWithWrongParamType = InitializeJavaBean(
+      Literal.fromObject(new TestBean),
+      Map("setX" -> Literal("1")))
+    intercept[Exception] {
+      evaluateWithoutCodegen(initializeWithWrongParamType, InternalRow.fromSeq(Seq()))
+    }.getMessage.contains(
+      """A method named "setX" is not declared in any enclosing class """ +
+        "nor any supertype")
+  }
+
+  test("Can not pass in null into setters in InitializeJavaBean") {
+    val initializeBean = InitializeJavaBean(
+      Literal.fromObject(new TestBean),
+      Map("setNonPrimitive" -> Literal(null)))
+    intercept[NullPointerException] {
+      evaluateWithoutCodegen(initializeBean, InternalRow.fromSeq(Seq()))
+    }.getMessage.contains("The parameter value for setters in `InitializeJavaBean` can not be null")
+    intercept[NullPointerException] {
+      evaluateWithGeneratedMutableProjection(initializeBean, InternalRow.fromSeq(Seq()))
+    }.getMessage.contains("The parameter value for setters in `InitializeJavaBean` can not be null")
+
+    val initializeBean2 = InitializeJavaBean(
+      Literal.fromObject(new TestBean),
+      Map("setNonPrimitive" -> Literal("string")))
+    evaluateWithoutCodegen(initializeBean2, InternalRow.fromSeq(Seq()))
+    evaluateWithGeneratedMutableProjection(initializeBean2, InternalRow.fromSeq(Seq()))
+  }
+
   test("SPARK-23585: UnwrapOption should support interpreted execution") {
     val cls = classOf[Option[Int]]
     val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
@@ -278,3 +322,11 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     }
   }
 }
+
+class TestBean extends Serializable {
+  private var x: Int = 0
+
+  def setX(i: Int): Unit = x = i
+  def setNonPrimitive(i: AnyRef): Unit =
+    assert(i != null, "this setter should not be called with null.")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org