You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2022/04/22 21:12:20 UTC

[spark] branch branch-3.3 updated: [SPARK-38977][SQL] Fix schema pruning with correlated subqueries

This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 9c5f38d8085 [SPARK-38977][SQL] Fix schema pruning with correlated subqueries
9c5f38d8085 is described below

commit 9c5f38d808573ced34bb52bdf4c5102ff2d1a7e2
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Apr 22 14:11:47 2022 -0700

    [SPARK-38977][SQL] Fix schema pruning with correlated subqueries
    
    ### What changes were proposed in this pull request?
    
    This PR fixes schema pruning for queries with multiple correlated subqueries. Previously, Spark would throw an exception trying to determine root fields in `SchemaPruning$identifyRootFields`. That was happening because expressions in predicates that referenced attributes in subqueries were not ignored. That's why attributes from multiple subqueries could conflict with each other (e.g. incompatible types) even though they should be ignored.
    
    For instance, the following query would throw a runtime exception.
    
    ```
    SELECT name FROM contacts c
    WHERE
     EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
     AND
     EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
    ```
    ```
    [info]   org.apache.spark.SparkException: Failed to merge fields 'value' and 'value'. Failed to merge incompatible data types int and string
    [info]   at org.apache.spark.sql.errors.QueryExecutionErrors$.failedMergingFieldsError(QueryExecutionErrors.scala:936)
    ```
    
    ### Why are the changes needed?
    
    These changes are needed to avoid exceptions for some queries with multiple correlated subqueries.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This PR comes with tests.
    
    Closes #36303 from aokolnychyi/spark-38977.
    
    Authored-by: Anton Okolnychyi <ao...@apple.com>
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
    (cherry picked from commit 0c9947dabcb71de414c97c0e60a1067e468f2642)
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 .../sql/catalyst/expressions/SchemaPruning.scala   |   4 +
 .../execution/datasources/SchemaPruningSuite.scala | 102 +++++++++++++++++++++
 2 files changed, 106 insertions(+)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
index fd5b2db61f3..e14bcba0ace 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
@@ -152,6 +152,10 @@ object SchemaPruning extends SQLConfHelper {
         RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil
       case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
         expr.children.flatMap(getRootFields).map(_.copy(prunedIfAnyChildAccessed = true))
+      case s: SubqueryExpression =>
+        // use subquery references that only include outer attrs and
+        // ignore join conditions as those may include attributes from other tables
+        s.references.toSeq.flatMap(getRootFields)
       case _ =>
         expr.children.flatMap(getRootFields)
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
index 2c227baa04f..1d62713331f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
@@ -935,4 +935,106 @@ abstract class SchemaPruningSuite
       .count()
     assert(count == 0)
   }
+
+  testSchemaPruning("SPARK-38977: schema pruning with correlated EXISTS subquery") {
+
+    import testImplicits._
+
+    withTempView("ids", "first_names") {
+      val df1 = Seq(1, 2, 3).toDF("value")
+      df1.createOrReplaceTempView("ids")
+
+      val df2 = Seq("John", "Bob").toDF("value")
+      df2.createOrReplaceTempView("first_names")
+
+      val query = sql(
+        """SELECT name FROM contacts c
+          |WHERE
+          |  EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
+          |  AND
+          |  EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
+          |""".stripMargin)
+
+      checkScan(query, "struct<id:int,name:struct<first:string,middle:string,last:string>>")
+
+      checkAnswer(query, Row(Row("John", "Y.", "Doe")) :: Nil)
+    }
+  }
+
+  testSchemaPruning("SPARK-38977: schema pruning with correlated NOT EXISTS subquery") {
+
+    import testImplicits._
+
+    withTempView("ids", "first_names") {
+      val df1 = Seq(1, 2, 3).toDF("value")
+      df1.createOrReplaceTempView("ids")
+
+      val df2 = Seq("John", "Bob").toDF("value")
+      df2.createOrReplaceTempView("first_names")
+
+      val query = sql(
+        """SELECT name FROM contacts c
+          |WHERE
+          |  NOT EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
+          |  AND
+          |  NOT EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
+          |""".stripMargin)
+
+      checkScan(query, "struct<id:int,name:struct<first:string,middle:string,last:string>>")
+
+      checkAnswer(query, Row(Row("Jane", "X.", "Doe")) :: Nil)
+    }
+  }
+
+  testSchemaPruning("SPARK-38977: schema pruning with correlated IN subquery") {
+
+    import testImplicits._
+
+    withTempView("ids", "first_names") {
+      val df1 = Seq(1, 2, 3).toDF("value")
+      df1.createOrReplaceTempView("ids")
+
+      val df2 = Seq("John", "Bob").toDF("value")
+      df2.createOrReplaceTempView("first_names")
+
+      val query = sql(
+        """SELECT name FROM contacts c
+          |WHERE
+          |  id IN (SELECT * FROM ids i WHERE c.pets > i.value)
+          |  AND
+          |  name.first IN (SELECT * FROM first_names n WHERE c.name.last < n.value)
+          |""".stripMargin)
+
+      checkScan(query,
+        "struct<id:int,name:struct<first:string,middle:string,last:string>,pets:int>")
+
+      checkAnswer(query, Row(Row("John", "Y.", "Doe")) :: Nil)
+    }
+  }
+
+  testSchemaPruning("SPARK-38977: schema pruning with correlated NOT IN subquery") {
+
+    import testImplicits._
+
+    withTempView("ids", "first_names") {
+      val df1 = Seq(1, 2, 3).toDF("value")
+      df1.createOrReplaceTempView("ids")
+
+      val df2 = Seq("John", "Janet", "Jim", "Bob").toDF("value")
+      df2.createOrReplaceTempView("first_names")
+
+      val query = sql(
+        """SELECT name FROM contacts c
+          |WHERE
+          |  id NOT IN (SELECT * FROM ids i WHERE c.pets > i.value)
+          |  AND
+          |  name.first NOT IN (SELECT * FROM first_names n WHERE c.name.last > n.value)
+          |""".stripMargin)
+
+      checkScan(query,
+        "struct<id:int,name:struct<first:string,middle:string,last:string>,pets:int>")
+
+      checkAnswer(query, Row(Row("Jane", "X.", "Doe")) :: Nil)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org