You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/04/05 11:39:50 UTC
spark git commit: [SPARK-23593][SQL] Add interpreted execution for
InitializeJavaBean expression
Repository: spark
Updated Branches:
refs/heads/master d3bd0435e -> c5c8b5440
[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean expression
## What changes were proposed in this pull request?
Add interpreted execution for `InitializeJavaBean` expression.
## How was this patch tested?
Added unit test.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #20756 from viirya/SPARK-23593.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5c8b544
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5c8b544
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5c8b544
Branch: refs/heads/master
Commit: c5c8b544047a83cb6128a20d31f1d943a15f9260
Parents: d3bd043
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Thu Apr 5 13:39:45 2018 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Apr 5 13:39:45 2018 +0200
----------------------------------------------------------------------
.../catalyst/expressions/objects/objects.scala | 47 +++++++++++++++++-
.../expressions/ExpressionEvalHelper.scala | 9 ++--
.../expressions/ObjectExpressionsSuite.scala | 52 ++++++++++++++++++++
3 files changed, 103 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c5c8b544/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index a455c1c..20c4f4c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -1410,8 +1410,47 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
override def children: Seq[Expression] = beanInstance +: setters.values.toSeq
override def dataType: DataType = beanInstance.dataType
- override def eval(input: InternalRow): Any =
- throw new UnsupportedOperationException("Only code-generated evaluation is supported.")
+ private lazy val resolvedSetters = {
+ assert(beanInstance.dataType.isInstanceOf[ObjectType])
+
+ val ObjectType(beanClass) = beanInstance.dataType
+ setters.map {
+ case (name, expr) =>
+ // Looking for known type mapping.
+ // But also looking for general `Object`-type parameter for generic methods.
+ val paramTypes = ScalaReflection.expressionJavaClasses(Seq(expr)) ++ Seq(classOf[Object])
+ val methods = paramTypes.flatMap { fieldClass =>
+ try {
+ Some(beanClass.getDeclaredMethod(name, fieldClass))
+ } catch {
+ case e: NoSuchMethodException => None
+ }
+ }
+ if (methods.isEmpty) {
+ throw new NoSuchMethodException(s"""A method named "$name" is not declared """ +
+ "in any enclosing class nor any supertype")
+ }
+ methods.head -> expr
+ }
+ }
+
+ override def eval(input: InternalRow): Any = {
+ val instance = beanInstance.eval(input)
+ if (instance != null) {
+ val bean = instance.asInstanceOf[Object]
+ resolvedSetters.foreach {
+ case (setter, expr) =>
+ val paramVal = expr.eval(input)
+ if (paramVal == null) {
+ throw new NullPointerException("The parameter value for setters in " +
+ "`InitializeJavaBean` can not be null")
+ } else {
+ setter.invoke(bean, paramVal.asInstanceOf[AnyRef])
+ }
+ }
+ }
+ instance
+ }
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val instanceGen = beanInstance.genCode(ctx)
@@ -1424,6 +1463,10 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
val fieldGen = fieldValue.genCode(ctx)
s"""
|${fieldGen.code}
+ |if (${fieldGen.isNull}) {
+ | throw new NullPointerException("The parameter value for setters in " +
+ | "`InitializeJavaBean` can not be null");
+ |}
|$javaBeanInstance.$setterMethod(${fieldGen.value});
""".stripMargin
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c5c8b544/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 3828f17..a5ecd1b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -55,7 +55,8 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
protected def checkEvaluation(
expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
- val expr = prepareEvaluation(expression)
+ // Make it as method to obtain fresh expression everytime.
+ def expr = prepareEvaluation(expression)
val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow)
@@ -111,12 +112,14 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
val errMsg = intercept[T] {
eval
}.getMessage
- if (errMsg != expectedErrMsg) {
+ if (!errMsg.contains(expectedErrMsg)) {
fail(s"Expected error message is `$expectedErrMsg`, but `$errMsg` found")
}
}
}
- val expr = prepareEvaluation(expression)
+
+ // Make it as method to obtain fresh expression everytime.
+ def expr = prepareEvaluation(expression)
checkException(evaluateWithoutCodegen(expr, inputRow), "non-codegen mode")
checkException(evaluateWithGeneratedMutableProjection(expr, inputRow), "codegen mode")
if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
http://git-wip-us.apache.org/repos/asf/spark/blob/c5c8b544/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index 9bfe291..44fecd6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -128,6 +128,50 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
Invoke(funcSubObj, "binOp", DoubleType, inputSum), 0.75, InternalRow.apply(1, 0.25))
}
+ test("SPARK-23593: InitializeJavaBean should support interpreted execution") {
+ val list = new java.util.LinkedList[Int]()
+ list.add(1)
+
+ val initializeBean = InitializeJavaBean(Literal.fromObject(new java.util.LinkedList[Int]),
+ Map("add" -> Literal(1)))
+ checkEvaluation(initializeBean, list, InternalRow.fromSeq(Seq()))
+
+ val initializeWithNonexistingMethod = InitializeJavaBean(
+ Literal.fromObject(new java.util.LinkedList[Int]),
+ Map("nonexisting" -> Literal(1)))
+ checkExceptionInExpression[Exception](initializeWithNonexistingMethod,
+ InternalRow.fromSeq(Seq()),
+ """A method named "nonexisting" is not declared in any enclosing class """ +
+ "nor any supertype")
+
+ val initializeWithWrongParamType = InitializeJavaBean(
+ Literal.fromObject(new TestBean),
+ Map("setX" -> Literal("1")))
+ intercept[Exception] {
+ evaluateWithoutCodegen(initializeWithWrongParamType, InternalRow.fromSeq(Seq()))
+ }.getMessage.contains(
+ """A method named "setX" is not declared in any enclosing class """ +
+ "nor any supertype")
+ }
+
+ test("Can not pass in null into setters in InitializeJavaBean") {
+ val initializeBean = InitializeJavaBean(
+ Literal.fromObject(new TestBean),
+ Map("setNonPrimitive" -> Literal(null)))
+ intercept[NullPointerException] {
+ evaluateWithoutCodegen(initializeBean, InternalRow.fromSeq(Seq()))
+ }.getMessage.contains("The parameter value for setters in `InitializeJavaBean` can not be null")
+ intercept[NullPointerException] {
+ evaluateWithGeneratedMutableProjection(initializeBean, InternalRow.fromSeq(Seq()))
+ }.getMessage.contains("The parameter value for setters in `InitializeJavaBean` can not be null")
+
+ val initializeBean2 = InitializeJavaBean(
+ Literal.fromObject(new TestBean),
+ Map("setNonPrimitive" -> Literal("string")))
+ evaluateWithoutCodegen(initializeBean2, InternalRow.fromSeq(Seq()))
+ evaluateWithGeneratedMutableProjection(initializeBean2, InternalRow.fromSeq(Seq()))
+ }
+
test("SPARK-23585: UnwrapOption should support interpreted execution") {
val cls = classOf[Option[Int]]
val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
@@ -278,3 +322,11 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}
}
+
+class TestBean extends Serializable {
+ private var x: Int = 0
+
+ def setX(i: Int): Unit = x = i
+ def setNonPrimitive(i: AnyRef): Unit =
+ assert(i != null, "this setter should not be called with null.")
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org