You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/02/10 17:24:07 UTC
[spark] branch branch-3.0 updated: Revert "[SPARK-29721][SQL] Prune
unnecessary nested fields from Generate without Project
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ff395a3 Revert "[SPARK-29721][SQL] Prune unnecessary nested fields from Generate without Project
ff395a3 is described below
commit ff395a39a5b10a7e71ef61813084bd3cf120280c
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Sun Feb 9 19:45:16 2020 -0800
Revert "[SPARK-29721][SQL] Prune unnecessary nested fields from Generate without Project
This reverts commit a0e63b61e7c5d55ae2a9213b95ab1e87ac7c203c.
### What changes were proposed in this pull request?
This reverts the patch at #26978 based on gatorsmile's suggestion.
### Why are the changes needed?
Original patch #26978 has not considered a corner case. We may need to put more time on ensuring we can cover all cases.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Unit test.
Closes #27504 from viirya/revert-SPARK-29721.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Xiao Li <ga...@gmail.com>
---
.../catalyst/optimizer/NestedColumnAliasing.scala | 47 ----------------------
.../spark/sql/catalyst/optimizer/Optimizer.scala | 43 +++++++++++---------
.../execution/datasources/SchemaPruningSuite.scala | 32 ---------------
3 files changed, 25 insertions(+), 97 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index ea85014..43a6006 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -155,53 +155,6 @@ object NestedColumnAliasing {
case MapType(keyType, valueType, _) => totalFieldNum(keyType) + totalFieldNum(valueType)
case _ => 1 // UDT and others
}
-}
-
-/**
- * This prunes unnessary nested columns from `Generate` and optional `Project` on top
- * of it.
- */
-object GeneratorNestedColumnAliasing {
- def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
- // Either `nestedPruningOnExpressions` or `nestedSchemaPruningEnabled` is enabled, we
- // need to prune nested columns through Project and under Generate. The difference is
- // when `nestedSchemaPruningEnabled` is on, nested columns will be pruned further at
- // file format readers if it is supported.
- case Project(projectList, g: Generate) if (SQLConf.get.nestedPruningOnExpressions ||
- SQLConf.get.nestedSchemaPruningEnabled) && canPruneGenerator(g.generator) =>
- // On top on `Generate`, a `Project` that might have nested column accessors.
- // We try to get alias maps for both project list and generator's children expressions.
- NestedColumnAliasing.getAliasSubMap(projectList ++ g.generator.children).map {
- case (nestedFieldToAlias, attrToAliases) =>
- val newChild = pruneGenerate(g, nestedFieldToAlias, attrToAliases)
- Project(NestedColumnAliasing.getNewProjectList(projectList, nestedFieldToAlias), newChild)
- }
-
- case g: Generate if SQLConf.get.nestedSchemaPruningEnabled &&
- canPruneGenerator(g.generator) =>
- NestedColumnAliasing.getAliasSubMap(g.generator.children).map {
- case (nestedFieldToAlias, attrToAliases) =>
- pruneGenerate(g, nestedFieldToAlias, attrToAliases)
- }
-
- case _ =>
- None
- }
-
- private def pruneGenerate(
- g: Generate,
- nestedFieldToAlias: Map[ExtractValue, Alias],
- attrToAliases: Map[ExprId, Seq[Alias]]): LogicalPlan = {
- val newGenerator = g.generator.transform {
- case f: ExtractValue if nestedFieldToAlias.contains(f) =>
- nestedFieldToAlias(f).toAttribute
- }.asInstanceOf[Generator]
-
- // Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`.
- val newGenerate = g.copy(generator = newGenerator)
-
- NestedColumnAliasing.replaceChildrenWithAliases(newGenerate, attrToAliases)
- }
/**
* This is a while-list for pruning nested fields at `Generator`.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index c90117b..08acac1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -601,24 +601,31 @@ object ColumnPruning extends Rule[LogicalPlan] {
s.copy(child = prunedChild(child, s.references))
// prune unrequired references
- case p @ Project(_, g: Generate) =>
- val currP = if (p.references != g.outputSet) {
- val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
- val newChild = prunedChild(g.child, requiredAttrs)
- val unrequired = g.generator.references -- p.references
- val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
- .map(_._2)
- p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
- } else {
- p
- }
- // If we can prune nested column on Project + Generate, do it now.
- // Otherwise by transforming down to Generate, it could be pruned individually,
- // and causes nested column on top Project unable to resolve.
- GeneratorNestedColumnAliasing.unapply(currP).getOrElse(currP)
-
- // prune unrequired nested fields from `Generate`.
- case GeneratorNestedColumnAliasing(p) => p
+ case p @ Project(_, g: Generate) if p.references != g.outputSet =>
+ val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
+ val newChild = prunedChild(g.child, requiredAttrs)
+ val unrequired = g.generator.references -- p.references
+ val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
+ .map(_._2)
+ p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
+
+ // prune unrequired nested fields
+ case p @ Project(projectList, g: Generate) if SQLConf.get.nestedPruningOnExpressions &&
+ NestedColumnAliasing.canPruneGenerator(g.generator) =>
+ NestedColumnAliasing.getAliasSubMap(projectList ++ g.generator.children).map {
+ case (nestedFieldToAlias, attrToAliases) =>
+ val newGenerator = g.generator.transform {
+ case f: ExtractValue if nestedFieldToAlias.contains(f) =>
+ nestedFieldToAlias(f).toAttribute
+ }.asInstanceOf[Generator]
+
+ // Defer updating `Generate.unrequiredChildIndex` to next round of `ColumnPruning`.
+ val newGenerate = g.copy(generator = newGenerator)
+
+ val newChild = NestedColumnAliasing.replaceChildrenWithAliases(newGenerate, attrToAliases)
+
+ Project(NestedColumnAliasing.getNewProjectList(projectList, nestedFieldToAlias), newChild)
+ }.getOrElse(p)
// Eliminate unneeded attributes from right side of a Left Existence Join.
case j @ Join(_, right, LeftExistence(_), _, _) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
index 5977e86..a3d4905 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
@@ -301,38 +301,6 @@ abstract class SchemaPruningSuite
checkAnswer(query, Row("Y.", 1) :: Row("X.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil)
}
- testSchemaPruning("select explode of nested field of array of struct") {
- // Config combinations
- val configs = Seq((true, true), (true, false), (false, true), (false, false))
-
- configs.foreach { case (nestedPruning, nestedPruningOnExpr) =>
- withSQLConf(
- SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> nestedPruning.toString,
- SQLConf.NESTED_PRUNING_ON_EXPRESSIONS.key -> nestedPruningOnExpr.toString) {
- val query1 = spark.table("contacts")
- .select(explode(col("friends.first")))
- if (nestedPruning) {
- // If `NESTED_SCHEMA_PRUNING_ENABLED` is enabled,
- // even disabling `NESTED_PRUNING_ON_EXPRESSIONS`,
- // nested schema is still pruned at scan node.
- checkScan(query1, "struct<friends:array<struct<first:string>>>")
- } else {
- checkScan(query1, "struct<friends:array<struct<first:string,middle:string,last:string>>>")
- }
- checkAnswer(query1, Row("Susan") :: Nil)
-
- val query2 = spark.table("contacts")
- .select(explode(col("friends.first")), col("friends.middle"))
- if (nestedPruning) {
- checkScan(query2, "struct<friends:array<struct<first:string,middle:string>>>")
- } else {
- checkScan(query2, "struct<friends:array<struct<first:string,middle:string,last:string>>>")
- }
- checkAnswer(query2, Row("Susan", Array("Z.")) :: Nil)
- }
- }
- }
-
protected def testSchemaPruning(testName: String)(testThunk: => Unit): Unit = {
test(s"Spark vectorized reader - without partition data column - $testName") {
withSQLConf(vectorizedReaderEnabledKey -> "true") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org