You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/06/01 11:10:48 UTC

[spark] branch branch-2.4 updated: [SPARK-31854][SQL][2.4] Invoke in MapElementsExec should not propagate null

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 6f6dcc8  [SPARK-31854][SQL][2.4] Invoke in MapElementsExec should not propagate null
6f6dcc8 is described below

commit 6f6dcc8eccacd3a567c08789c53ceae8fbc124de
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Mon Jun 1 20:09:09 2020 +0900

    [SPARK-31854][SQL][2.4] Invoke in MapElementsExec should not propagate null
    
    ### What changes were proposed in this pull request?
    
    This PR intends to fix a bug of `Dataset.map` below when the whole-stage codegen enabled;
    ```
    scala> val ds = Seq(1.asInstanceOf[Integer], null.asInstanceOf[Integer]).toDS()
    
    scala> sql("SET spark.sql.codegen.wholeStage=true")
    
    scala> ds.map(v=>(v,v)).explain
    == Physical Plan ==
    *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1.intValue AS _1#69, assertnotnull(input[0, scala.Tuple2, true])._2.intValue AS _2#70]
    +- *(1) MapElements <function1>, obj#68: scala.Tuple2
       +- *(1) DeserializeToObject staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, value#1, true, false), obj#67: java.lang.Integer
          +- LocalTableScan [value#1]
    
    // `AssertNotNull` in `SerializeFromObject` will fail;
    scala> ds.map(v => (v, v)).show()
    java.lang.NullPointerException: Null value appeared in non-nullable fails:
    top level Product input object
    If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
    
    // When the whole-stage codegen disabled, the query works well;
    scala> sql("SET spark.sql.codegen.wholeStage=false")
    scala> ds.map(v=>(v,v)).show()
    +----+----+
    |  _1|  _2|
    +----+----+
    |   1|   1|
    |null|null|
    +----+----+
    ```
    A root cause is that `Invoke` used in `MapElementsExec` propagates input null, and then [AssertNotNull](https://github.com/apache/spark/blob/1b780f364bfbb46944fe805a024bb6c32f5d2dde/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L253-L255) in `SerializeFromObject` fails because a top-level row becomes null. So, `MapElementsExec` should not return `null` but `(null, null)`.
    
    NOTE: the generated code of the query above in the current master;
    ```
    /* 033 */   private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException {
    /* 034 */     boolean mapelements_isNull_1 = true;
    /* 035 */     scala.Tuple2 mapelements_value_1 = null;
    /* 036 */     if (!false) {
    /* 037 */       mapelements_resultIsNull_0 = false;
    /* 038 */
    /* 039 */       if (!mapelements_resultIsNull_0) {
    /* 040 */         mapelements_resultIsNull_0 = mapelements_exprIsNull_0_0;
    /* 041 */         mapelements_mutableStateArray_0[0] = mapelements_expr_0_0;
    /* 042 */       }
    /* 043 */
    /* 044 */       mapelements_isNull_1 = mapelements_resultIsNull_0;
    /* 045 */       if (!mapelements_isNull_1) {
    /* 046 */         Object mapelements_funcResult_0 = null;
    /* 047 */         mapelements_funcResult_0 = ((scala.Function1) references[1] /* literal */).apply(mapelements_mutableStateArray_0[0]);
    /* 048 */
    /* 049 */         if (mapelements_funcResult_0 != null) {
    /* 050 */           mapelements_value_1 = (scala.Tuple2) mapelements_funcResult_0;
    /* 051 */         } else {
    /* 052 */           mapelements_isNull_1 = true;
    /* 053 */         }
    /* 054 */
    /* 055 */       }
    /* 056 */     }
    /* 057 */
    /* 058 */     serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1);
    /* 059 */
    /* 060 */   }
    ```
    
    The generated code w/ this fix;
    ```
    /* 032 */   private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException {
    /* 033 */     boolean mapelements_isNull_1 = true;
    /* 034 */     scala.Tuple2 mapelements_value_1 = null;
    /* 035 */     if (!false) {
    /* 036 */       mapelements_mutableStateArray_0[0] = mapelements_expr_0_0;
    /* 037 */
    /* 038 */       mapelements_isNull_1 = false;
    /* 039 */       if (!mapelements_isNull_1) {
    /* 040 */         Object mapelements_funcResult_0 = null;
    /* 041 */         mapelements_funcResult_0 = ((scala.Function1) references[1] /* literal */).apply(mapelements_mutableStateArray_0[0]);
    /* 042 */
    /* 043 */         if (mapelements_funcResult_0 != null) {
    /* 044 */           mapelements_value_1 = (scala.Tuple2) mapelements_funcResult_0;
    /* 045 */           mapelements_isNull_1 = false;
    /* 046 */         } else {
    /* 047 */           mapelements_isNull_1 = true;
    /* 048 */         }
    /* 049 */
    /* 050 */       }
    /* 051 */     }
    /* 052 */
    /* 053 */     serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1);
    /* 054 */
    /* 055 */   }
    ```
    
    This comes from https://github.com/apache/spark/pull/28681
    
    ### Why are the changes needed?
    
    Bugfix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added tests.
    
    Closes #28691 from maropu/SPARK-31854-BRANCH2.4.
    
    Authored-by: Takeshi Yamamuro <ya...@apache.org>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../main/scala/org/apache/spark/sql/execution/objects.scala    |  3 ++-
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala     | 10 ++++++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 03d1bbf..27673fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -217,7 +217,8 @@ case class MapElementsExec(
       case _ => FunctionUtils.getFunctionOneName(outputObjAttr.dataType, child.output(0).dataType)
     }
     val funcObj = Literal.create(func, ObjectType(funcClass))
-    val callFunc = Invoke(funcObj, methodName, outputObjAttr.dataType, child.output)
+    val callFunc = Invoke(funcObj, methodName, outputObjAttr.dataType, child.output,
+      propagateNull = false)
 
     val result = BindReferences.bindReference(callFunc, child.output).genCode(ctx)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 01d0877..08ebf8b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1576,6 +1576,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     }
     assert(thrownException.message.contains("Cannot up cast `id` from bigint to tinyint"))
   }
+
+  test("SPARK-31854: Invoke in MapElementsExec should not propagate null") {
+    Seq("true", "false").foreach { wholeStage =>
+      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStage) {
+        val ds = Seq(1.asInstanceOf[Integer], null.asInstanceOf[Integer]).toDS()
+        val expectedAnswer = Seq[(Integer, Integer)]((1, 1), (null, null))
+        checkDataset(ds.map(v => (v, v)), expectedAnswer: _*)
+      }
+    }
+  }
 }
 
 case class TestDataUnion(x: Int, y: Int, z: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org