You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/06/18 00:25:35 UTC
[spark] branch branch-3.3 updated: [SPARK-39496][SQL] Handle null struct in `Inline.eval`
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 1dea5746fe5 [SPARK-39496][SQL] Handle null struct in `Inline.eval`
1dea5746fe5 is described below
commit 1dea5746fe5af42b121f5500d0f6c0b1a7947b88
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Sat Jun 18 09:25:11 2022 +0900
[SPARK-39496][SQL] Handle null struct in `Inline.eval`
### What changes were proposed in this pull request?
Change `Inline.eval` to return a row of null values rather than a null row in the case of a null input struct.
### Why are the changes needed?
Consider the following query:
```
set spark.sql.codegen.wholeStage=false;
select inline(array(named_struct('a', 1, 'b', 2), null));
```
This query fails with a `NullPointerException`:
```
22/06/16 15:10:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$11(GenerateExec.scala:122)
```
(In Spark 3.1.3, you don't need to set `spark.sql.codegen.wholeStage` to false to reproduce the error, since Spark 3.1.3 has no codegen path for `Inline`).
This query fails regardless of the setting of `spark.sql.codegen.wholeStage`:
```
val dfWide = (Seq((1))
.toDF("col0")
.selectExpr(Seq.tabulate(99)(x => s"$x as col${x + 1}"): _*))
val df = (dfWide
.selectExpr("*", "array(named_struct('a', 1, 'b', 2), null) as struct_array"))
df.selectExpr("*", "inline(struct_array)").collect
```
It fails with
```
22/06/16 15:18:55 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_8$(Unknown Source)
```
When `Inline.eval` returns a null row in the collection, GenerateExec gets a NullPointerException either when joining the null row with required child output, or projecting the null row.
This PR avoids producing the null row and produces a row of null values instead:
```
spark-sql> set spark.sql.codegen.wholeStage=false;
spark.sql.codegen.wholeStage false
Time taken: 3.095 seconds, Fetched 1 row(s)
spark-sql> select inline(array(named_struct('a', 1, 'b', 2), null));
1 2
NULL NULL
Time taken: 1.214 seconds, Fetched 2 row(s)
spark-sql>
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit test.
Closes #36903 from bersprockets/inline_eval_null_struct_issue.
Authored-by: Bruce Robbins <be...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit c4d5390dd032d17a40ad50e38f0ed7bd9bbd4698)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../apache/spark/sql/catalyst/expressions/generators.scala | 8 ++++++--
.../scala/org/apache/spark/sql/GeneratorFunctionSuite.scala | 13 ++++++++++++-
2 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index 1079f0a333d..d305b4d3700 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -452,13 +452,17 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene
private lazy val numFields = elementSchema.fields.length
+ private lazy val generatorNullRow = new GenericInternalRow(elementSchema.length)
+
override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
val inputArray = child.eval(input).asInstanceOf[ArrayData]
if (inputArray == null) {
Nil
} else {
- for (i <- 0 until inputArray.numElements())
- yield inputArray.getStruct(i, numFields)
+ for (i <- 0 until inputArray.numElements()) yield {
+ val s = inputArray.getStruct(i, numFields)
+ if (s == null) generatorNullRow else s
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index ef87e10946b..09afedea7a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.trees.LeafLike
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -389,7 +390,7 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
}
}
- test("SPARK-39061: inline should handle null struct") {
+ def testNullStruct(): Unit = {
val df = sql(
"""select * from values
|(
@@ -413,6 +414,16 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
sql("select a, inline(b) from t1"),
Row(1, 0, 1) :: Row(1, null, null) :: Row(1, 2, 3) :: Row(1, null, null) :: Nil)
}
+
+ test("SPARK-39061: inline should handle null struct") {
+ testNullStruct
+ }
+
+ test("SPARK-39496: inline eval path should handle null struct") {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ testNullStruct
+ }
+ }
}
case class EmptyGenerator() extends Generator with LeafLike[Expression] {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org