You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by db...@apache.org on 2019/01/11 19:23:44 UTC

[spark] branch master updated: [SPARK-26551][SQL] Fix schema pruning error when selecting one complex field and having is not null predicate on another one

This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 50ebf3a  [SPARK-26551][SQL] Fix schema pruning error when selecting one complex field and having is not null predicate on another one
50ebf3a is described below

commit 50ebf3a43b84c8538ec60437189221c2c527990b
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Fri Jan 11 19:23:32 2019 +0000

    [SPARK-26551][SQL] Fix schema pruning error when selecting one complex field and having is not null predicate on another one
    
    ## What changes were proposed in this pull request?
    
    Schema pruning has errors when selecting one complex field and having is not null predicate on another one:
    
    ```scala
    val query = sql("select * from contacts")
      .where("name.middle is not null")
      .select(
        "id",
        "name.first",
        "name.middle",
        "name.last"
      )
      .where("last = 'Jones'")
      .select(count("id"))
    ```
    
    ```
    java.lang.IllegalArgumentException: middle does not exist. Available: last
    [info]   at org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:303)
    [info]   at scala.collection.immutable.Map$Map1.getOrElse(Map.scala:119)
    [info]   at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:302)
    [info]   at org.apache.spark.sql.execution.ProjectionOverSchema.$anonfun$getProjection$6(ProjectionOverSchema.scala:58)
    [info]   at scala.Option.map(Option.scala:163)
    [info]   at org.apache.spark.sql.execution.ProjectionOverSchema.getProjection(ProjectionOverSchema.scala:56)
    [info]   at org.apache.spark.sql.execution.ProjectionOverSchema.unapply(ProjectionOverSchema.scala:32)
    [info]   at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaPruning$$anonfun$$nestedInanonfun$buildNewProjection$1$1.applyOrElse(Parque
    tSchemaPruning.scala:153)
    ```
    
    ## How was this patch tested?
    
    Added tests.
    
    Closes #23474 from viirya/SPARK-26551.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: DB Tsai <d_...@apple.com>
---
 .../datasources/parquet/ParquetSchemaPruning.scala | 34 +++++++++++++++-----
 .../parquet/ParquetSchemaPruningSuite.scala        | 36 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 8 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
index 91080b1..840fcae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
@@ -116,10 +116,28 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
     // For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`,
     // we don't need to read nested fields of `name` struct other than `first` field.
     val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields)
-      .distinct.partition(_.contentAccessed)
+      .distinct.partition(!_.prunedIfAnyChildAccessed)
 
     optRootFields.filter { opt =>
-      !rootFields.exists(_.field.name == opt.field.name)
+      !rootFields.exists { root =>
+        root.field.name == opt.field.name && {
+          // Checking if current optional root field can be pruned.
+          // For each required root field, we merge it with the optional root field:
+          // 1. If this optional root field has nested fields and any nested field of it is used
+          //    in the query, the merged field type must equal to the optional root field type.
+          //    We can prune this optional root field. For example, for optional root field
+          //    `struct<name:struct<middle:string,last:string>>`, if its field
+          //    `struct<name:struct<last:string>>` is used, we don't need to add this optional
+          //    root field.
+          // 2. If this optional root field has no nested fields, the merged field type equals
+          //    to the optional root field only if they are the same. If they are, we can prune
+          //    this optional root field too.
+          val rootFieldType = StructType(Array(root.field))
+          val optFieldType = StructType(Array(opt.field))
+          val merged = optFieldType.merge(rootFieldType)
+          merged.sameType(optFieldType)
+        }
+      }
     } ++ rootFields
   }
 
@@ -213,11 +231,11 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
       // don't actually use any nested fields. These root field accesses might be excluded later
       // if there are any nested fields accesses in the query plan.
       case IsNotNull(SelectedField(field)) =>
-        RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
+        RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil
       case IsNull(SelectedField(field)) =>
-        RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
+        RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil
       case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
-        expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false))
+        expr.children.flatMap(getRootFields).map(_.copy(prunedIfAnyChildAccessed = true))
       case _ =>
         expr.children.flatMap(getRootFields)
     }
@@ -271,9 +289,9 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
   /**
    * This represents a "root" schema field (aka top-level, no-parent). `field` is the
    * `StructField` for field name and datatype. `derivedFromAtt` indicates whether it
-   * was derived from an attribute or had a proper child. `contentAccessed` means whether
-   * it was accessed with its content by the expressions refer it.
+   * was derived from an attribute or had a proper child. `prunedIfAnyChildAccessed` means
+   * whether this root field can be pruned if any of child field is used in the query.
    */
   private case class RootField(field: StructField, derivedFromAtt: Boolean,
-    contentAccessed: Boolean = true)
+    prunedIfAnyChildAccessed: Boolean = false)
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
index 9a02529..4d15f38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.SchemaPruningTest
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.StructType
@@ -217,6 +218,41 @@ class ParquetSchemaPruningSuite
       Row("Y.") :: Nil)
   }
 
+  testSchemaPruning("select one complex field and having is null predicate on another " +
+      "complex field") {
+    val query = sql("select * from contacts")
+      .where("name.middle is not null")
+      .select(
+        "id",
+        "name.first",
+        "name.middle",
+        "name.last"
+      )
+      .where("last = 'Jones'")
+      .select(count("id")).toDF()
+    checkScan(query,
+      "struct<id:int,name:struct<middle:string,last:string>>")
+    checkAnswer(query, Row(0) :: Nil)
+  }
+
+  testSchemaPruning("select one deep nested complex field and having is null predicate on " +
+      "another deep nested complex field") {
+    val query = sql("select * from contacts")
+      .where("employer.company.address is not null")
+      .selectExpr(
+        "id",
+        "name.first",
+        "name.middle",
+        "name.last",
+        "employer.id as employer_id"
+      )
+      .where("employer_id = 0")
+      .select(count("id")).toDF()
+    checkScan(query,
+      "struct<id:int,employer:struct<id:int,company:struct<address:string>>>")
+    checkAnswer(query, Row(1) :: Nil)
+  }
+
   private def testSchemaPruning(testName: String)(testThunk: => Unit) {
     test(s"Spark vectorized reader - without partition data column - $testName") {
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org