You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by db...@apache.org on 2019/01/11 19:23:44 UTC
[spark] branch master updated: [SPARK-26551][SQL] Fix schema
pruning error when selecting one complex field and having is not null
predicate on another one
This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 50ebf3a [SPARK-26551][SQL] Fix schema pruning error when selecting one complex field and having is not null predicate on another one
50ebf3a is described below
commit 50ebf3a43b84c8538ec60437189221c2c527990b
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Fri Jan 11 19:23:32 2019 +0000
[SPARK-26551][SQL] Fix schema pruning error when selecting one complex field and having is not null predicate on another one
## What changes were proposed in this pull request?
Schema pruning has errors when selecting one complex field and having is not null predicate on another one:
```scala
val query = sql("select * from contacts")
.where("name.middle is not null")
.select(
"id",
"name.first",
"name.middle",
"name.last"
)
.where("last = 'Jones'")
.select(count("id"))
```
```
java.lang.IllegalArgumentException: middle does not exist. Available: last
[info] at org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:303)
[info] at scala.collection.immutable.Map$Map1.getOrElse(Map.scala:119)
[info] at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:302)
[info] at org.apache.spark.sql.execution.ProjectionOverSchema.$anonfun$getProjection$6(ProjectionOverSchema.scala:58)
[info] at scala.Option.map(Option.scala:163)
[info] at org.apache.spark.sql.execution.ProjectionOverSchema.getProjection(ProjectionOverSchema.scala:56)
[info] at org.apache.spark.sql.execution.ProjectionOverSchema.unapply(ProjectionOverSchema.scala:32)
[info] at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaPruning$$anonfun$$nestedInanonfun$buildNewProjection$1$1.applyOrElse(Parque
tSchemaPruning.scala:153)
```
## How was this patch tested?
Added tests.
Closes #23474 from viirya/SPARK-26551.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: DB Tsai <d_...@apple.com>
---
.../datasources/parquet/ParquetSchemaPruning.scala | 34 +++++++++++++++-----
.../parquet/ParquetSchemaPruningSuite.scala | 36 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 8 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
index 91080b1..840fcae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
@@ -116,10 +116,28 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
// For example, for a query `SELECT name.first FROM contacts WHERE name IS NOT NULL`,
// we don't need to read nested fields of `name` struct other than `first` field.
val (rootFields, optRootFields) = (projectionRootFields ++ filterRootFields)
- .distinct.partition(_.contentAccessed)
+ .distinct.partition(!_.prunedIfAnyChildAccessed)
optRootFields.filter { opt =>
- !rootFields.exists(_.field.name == opt.field.name)
+ !rootFields.exists { root =>
+ root.field.name == opt.field.name && {
+ // Checking if current optional root field can be pruned.
+ // For each required root field, we merge it with the optional root field:
+ // 1. If this optional root field has nested fields and any nested field of it is used
+ // in the query, the merged field type must equal to the optional root field type.
+ // We can prune this optional root field. For example, for optional root field
+ // `struct<name:struct<middle:string,last:string>>`, if its field
+ // `struct<name:struct<last:string>>` is used, we don't need to add this optional
+ // root field.
+ // 2. If this optional root field has no nested fields, the merged field type equals
+ // to the optional root field only if they are the same. If they are, we can prune
+ // this optional root field too.
+ val rootFieldType = StructType(Array(root.field))
+ val optFieldType = StructType(Array(opt.field))
+ val merged = optFieldType.merge(rootFieldType)
+ merged.sameType(optFieldType)
+ }
+ }
} ++ rootFields
}
@@ -213,11 +231,11 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
// don't actually use any nested fields. These root field accesses might be excluded later
// if there are any nested fields accesses in the query plan.
case IsNotNull(SelectedField(field)) =>
- RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
+ RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil
case IsNull(SelectedField(field)) =>
- RootField(field, derivedFromAtt = false, contentAccessed = false) :: Nil
+ RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = true) :: Nil
case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
- expr.children.flatMap(getRootFields).map(_.copy(contentAccessed = false))
+ expr.children.flatMap(getRootFields).map(_.copy(prunedIfAnyChildAccessed = true))
case _ =>
expr.children.flatMap(getRootFields)
}
@@ -271,9 +289,9 @@ private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
/**
* This represents a "root" schema field (aka top-level, no-parent). `field` is the
* `StructField` for field name and datatype. `derivedFromAtt` indicates whether it
- * was derived from an attribute or had a proper child. `contentAccessed` means whether
- * it was accessed with its content by the expressions refer it.
+ * was derived from an attribute or had a proper child. `prunedIfAnyChildAccessed` means
+ * whether this root field can be pruned if any of child field is used in the query.
*/
private case class RootField(field: StructField, derivedFromAtt: Boolean,
- contentAccessed: Boolean = true)
+ prunedIfAnyChildAccessed: Boolean = false)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
index 9a02529..4d15f38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.SchemaPruningTest
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType
@@ -217,6 +218,41 @@ class ParquetSchemaPruningSuite
Row("Y.") :: Nil)
}
+ testSchemaPruning("select one complex field and having is null predicate on another " +
+ "complex field") {
+ val query = sql("select * from contacts")
+ .where("name.middle is not null")
+ .select(
+ "id",
+ "name.first",
+ "name.middle",
+ "name.last"
+ )
+ .where("last = 'Jones'")
+ .select(count("id")).toDF()
+ checkScan(query,
+ "struct<id:int,name:struct<middle:string,last:string>>")
+ checkAnswer(query, Row(0) :: Nil)
+ }
+
+ testSchemaPruning("select one deep nested complex field and having is null predicate on " +
+ "another deep nested complex field") {
+ val query = sql("select * from contacts")
+ .where("employer.company.address is not null")
+ .selectExpr(
+ "id",
+ "name.first",
+ "name.middle",
+ "name.last",
+ "employer.id as employer_id"
+ )
+ .where("employer_id = 0")
+ .select(count("id")).toDF()
+ checkScan(query,
+ "struct<id:int,employer:struct<id:int,company:struct<address:string>>>")
+ checkAnswer(query, Row(1) :: Nil)
+ }
+
private def testSchemaPruning(testName: String)(testThunk: => Unit) {
test(s"Spark vectorized reader - without partition data column - $testName") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org