You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/02/06 21:35:17 UTC

[spark] branch branch-3.0 updated: [SPARK-27986][SQL][FOLLOWUP] window aggregate function with filter predicate is not supported

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1baee64  [SPARK-27986][SQL][FOLLOWUP] window aggregate function with filter predicate is not supported
1baee64 is described below

commit 1baee64750b4098ec37be6408906c70674579eb8
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Feb 6 13:33:39 2020 -0800

    [SPARK-27986][SQL][FOLLOWUP] window aggregate function with filter predicate is not supported
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/26656.
    
    We don't support window aggregate function with filter predicate yet and we should fail explicitly.
    
    Observable metrics has the same issue. This PR fixes it as well.
    
    ### Why are the changes needed?
    
    If we simply ignore filter predicate when we don't support it, the result is wrong.
    
    ### Does this PR introduce any user-facing change?
    
    yea, fix the query result.
    
    ### How was this patch tested?
    
    new tests
    
    Closes #27476 from cloud-fan/filter.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 5a4c70b4e2367441ce4260f02d39d3345078f411)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala       |  4 ++++
 .../spark/sql/catalyst/analysis/CheckAnalysis.scala  |  3 +++
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala   | 20 ++++++++++++++++++--
 .../spark/sql/catalyst/analysis/AnalysisSuite.scala  | 10 +++++++++-
 .../src/test/resources/sql-tests/inputs/window.sql   |  5 +++++
 .../test/resources/sql-tests/results/window.sql.out  | 13 ++++++++++++-
 6 files changed, 51 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 56cc2a2..75f1aa7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2428,6 +2428,10 @@ class Analyzer(
             }
             wsc.copy(partitionSpec = newPartitionSpec, orderSpec = newOrderSpec)
 
+          case WindowExpression(ae: AggregateExpression, _) if ae.filter.isDefined =>
+            failAnalysis(
+              "window aggregate function with filter predicate is not supported yet.")
+
           // Extract Windowed AggregateExpression
           case we @ WindowExpression(
               ae @ AggregateExpression(function, _, _, _, _),
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 4ec737f..e769e03 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -308,6 +308,9 @@ trait CheckAnalysis extends PredicateHelper {
                 case a: AggregateExpression if a.isDistinct =>
                   e.failAnalysis(
                     "distinct aggregates are not allowed in observed metrics, but found: " + s.sql)
+                case a: AggregateExpression if a.filter.isDefined =>
+                  e.failAnalysis("aggregates with filter predicate are not allowed in " +
+                    "observed metrics, but found: " + s.sql)
                 case _: Attribute if !seenAggregate =>
                   e.failAnalysis (s"attribute ${s.sql} can only be used as an argument to an " +
                     "aggregate function.")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 7023dbe..5cc0453 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -165,6 +165,22 @@ class AnalysisErrorSuite extends AnalysisTest {
     "Distinct window functions are not supported" :: Nil)
 
   errorTest(
+    "window aggregate function with filter predicate",
+    testRelation2.select(
+      WindowExpression(
+        AggregateExpression(
+          Count(UnresolvedAttribute("b")),
+          Complete,
+          isDistinct = false,
+          filter = Some(UnresolvedAttribute("b") > 1)),
+        WindowSpecDefinition(
+          UnresolvedAttribute("a") :: Nil,
+          SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil,
+          UnspecifiedFrame)).as("window")),
+    "window aggregate function with filter predicate is not supported" :: Nil
+  )
+
+  errorTest(
     "distinct function",
     CatalystSqlParser.parsePlan("SELECT hex(DISTINCT a) FROM TaBlE"),
     "DISTINCT or FILTER specified, but hex is not an aggregate function" :: Nil)
@@ -191,12 +207,12 @@ class AnalysisErrorSuite extends AnalysisTest {
     "FILTER predicate specified, but aggregate is not an aggregate function" :: Nil)
 
   errorTest(
-    "DISTINCT and FILTER cannot be used in aggregate functions at the same time",
+    "DISTINCT aggregate function with filter predicate",
     CatalystSqlParser.parsePlan("SELECT count(DISTINCT a) FILTER (WHERE c > 1) FROM TaBlE2"),
     "DISTINCT and FILTER cannot be used in aggregate functions at the same time" :: Nil)
 
   errorTest(
-    "FILTER expression is non-deterministic, it cannot be used in aggregate functions",
+    "non-deterministic filter predicate in aggregate functions",
     CatalystSqlParser.parsePlan("SELECT count(a) FILTER (WHERE rand(int(c)) > 1) FROM TaBlE2"),
     "FILTER expression is non-deterministic, it cannot be used in aggregate functions" :: Nil)
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 5405009..c747d39 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.{Count, Sum}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
 import org.apache.spark.sql.catalyst.plans.{Cross, Inner}
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -736,5 +736,13 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       b :: ScalarSubquery(subquery, Nil).as("sum") :: Nil,
       CollectMetrics("evt1", count :: Nil, tblB))
     assertAnalysisError(query, "Multiple definitions of observed metrics" :: "evt1" :: Nil)
+
+    // Aggregate with filter predicate - fail
+    val sumWithFilter = sum.transform {
+      case a: AggregateExpression => a.copy(filter = Some(true))
+    }.asInstanceOf[NamedExpression]
+    assertAnalysisError(
+      CollectMetrics("evt1", sumWithFilter :: Nil, testRelation),
+      "aggregates with filter predicate are not allowed" :: Nil)
   }
 }
diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql
index e25a252..3d05dfd 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/window.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql
@@ -120,3 +120,8 @@ SELECT cate, sum(val) OVER (w)
 FROM testData
 WHERE val is not null
 WINDOW w AS (PARTITION BY cate ORDER BY val);
+
+-- with filter predicate
+SELECT val, cate,
+count(val) FILTER (WHERE val > 1) OVER(PARTITION BY cate)
+FROM testData ORDER BY cate, val;
\ No newline at end of file
diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out
index f795374..625088f 100644
--- a/sql/core/src/test/resources/sql-tests/results/window.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 23
+-- Number of queries: 24
 
 
 -- !query
@@ -380,3 +380,14 @@ a	4
 b	1
 b	3
 b	6
+
+
+-- !query
+SELECT val, cate,
+count(val) FILTER (WHERE val > 1) OVER(PARTITION BY cate)
+FROM testData ORDER BY cate, val
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+window aggregate function with filter predicate is not supported yet.;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org