You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/23 05:30:49 UTC

[spark] branch master updated: [SPARK-39548][SQL] CreateView Command with a window clause query hit a wrong window definition not found issue

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4718d59c6c4 [SPARK-39548][SQL] CreateView Command with a window clause query hit a wrong window definition not found issue
4718d59c6c4 is described below

commit 4718d59c6c4e201bf940303a4311dfb753372395
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Thu Jun 23 13:30:34 2022 +0800

    [SPARK-39548][SQL] CreateView Command with a window clause query hit a wrong window definition not found issue
    
    ### What changes were proposed in this pull request?
    
    1. In the inline CTE code path, fix a bug that top down style unresolved window expression check leads to mis-clarification of a defined window expression.
    2. Move unresolved window expression check in project to `CheckAnalysis`.
    
    ### Why are the changes needed?
    
    This bug fails a correct query.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UT
    
    Closes #36947 from amaliujia/improvewindow.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 10 +---------
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 10 +++++++++-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 23 ++++++++++++++++++++++
 3 files changed, 33 insertions(+), 10 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4d2dd175260..2665931686e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -451,7 +451,7 @@ class Analyzer(override val catalogManager: CatalogManager)
    * Substitute child plan with WindowSpecDefinitions.
    */
   object WindowsSubstitution extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning(
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
       _.containsAnyPattern(WITH_WINDOW_DEFINITION, UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
       // Lookup WindowSpecDefinitions. This rule works with unresolved children.
       case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions {
@@ -460,14 +460,6 @@ class Analyzer(override val catalogManager: CatalogManager)
             throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
           WindowExpression(c, windowSpecDefinition)
       }
-
-      case p @ Project(projectList, _) =>
-        projectList.foreach(_.transformDownWithPruning(
-          _.containsPattern(UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
-          case UnresolvedWindowExpression(_, windowSpec) =>
-            throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
-        })
-        p
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 759683b8c00..cf734b7aa26 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, Decorrela
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_WINDOW_EXPRESSION
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils, TypeUtils}
 import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement}
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -231,7 +232,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
             failAnalysis("grouping_id() can only be used with GroupingSets/Cube/Rollup")
 
           case e: Expression if e.children.exists(_.isInstanceOf[WindowFunction]) &&
-              !e.isInstanceOf[WindowExpression] =>
+              !e.isInstanceOf[WindowExpression] && e.resolved =>
             val w = e.children.find(_.isInstanceOf[WindowFunction]).get
             failAnalysis(s"Window function $w requires an OVER clause.")
 
@@ -542,6 +543,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
               s"""Only a single table generating function is allowed in a SELECT clause, found:
                  | ${exprs.map(_.sql).mkString(",")}""".stripMargin)
 
+          case p @ Project(projectList, _) =>
+            projectList.foreach(_.transformDownWithPruning(
+              _.containsPattern(UNRESOLVED_WINDOW_EXPRESSION)) {
+              case UnresolvedWindowExpression(_, windowSpec) =>
+                throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
+            })
+
           case j: Join if !j.duplicateResolved =>
             val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet)
             failAnalysis(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 21430683840..056e59b3b86 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -4449,6 +4449,29 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
         """.stripMargin),
       Seq(Row(2), Row(1)))
   }
+
+  test("SPARK-39548: CreateView will make queries go into inline CTE code path thus" +
+    "trigger a mis-clarified `window definition not found` issue") {
+    sql(
+      """
+        |create or replace temporary view test_temp_view as
+        |with step_1 as (
+        |select * , min(a) over w2 as min_a_over_w2 from
+        |(select 1 as a, 2 as b, 3 as c) window w2 as (partition by b order by c)) , step_2 as
+        |(
+        |select *, max(e) over w1 as max_a_over_w1
+        |from (select 1 as e, 2 as f, 3 as g)
+        |join step_1 on true
+        |window w1 as (partition by f order by g)
+        |)
+        |select *
+        |from step_2
+        |""".stripMargin)
+
+    checkAnswer(
+      sql("select * from test_temp_view"),
+      Row(1, 2, 3, 1, 2, 3, 1, 1))
+  }
 }
 
 case class Foo(bar: Option[String])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org