You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/08 09:06:43 UTC

[spark] branch branch-3.1 updated: [SPARK-34003][SQL] Fix Rule conflicts between PaddingAndLengthCheckForCharVarchar and ResolveAggregateFunctions

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c8bf22e  [SPARK-34003][SQL] Fix Rule conflicts between PaddingAndLengthCheckForCharVarchar and ResolveAggregateFunctions
c8bf22e is described below

commit c8bf22e9f7b73cd6f8873c8ea466c59361335ae5
Author: Kent Yao <ya...@apache.org>
AuthorDate: Fri Jan 8 09:05:22 2021 +0000

    [SPARK-34003][SQL] Fix Rule conflicts between PaddingAndLengthCheckForCharVarchar and ResolveAggregateFunctions
    
    ### What changes were proposed in this pull request?
    
    ResolveAggregateFunctions is a hacky rule and it calls `executeSameContext` to generate a `resolved agg` to determine which unresolved sort attribute should be pushed into the agg. However, after we add the PaddingAndLengthCheckForCharVarchar rule which will rewrite the query output, thus, the `resolved agg` cannot match original attributes anymore.
    
    It causes some dissociative sort attribute to be pushed in and fails the query
    
    ``` logtalk
    [info]   Failed to analyze query: org.apache.spark.sql.AnalysisException: expression 'testcat.t1.`v`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
    [info]   Project [v#14, sum(i)#11L]
    [info]   +- Sort [aggOrder#12 ASC NULLS FIRST], true
    [info]      +- !Aggregate [v#14], [v#14, sum(cast(i#7 as bigint)) AS sum(i)#11L, v#13 AS aggOrder#12]
    [info]         +- SubqueryAlias testcat.t1
    [info]            +- Project [if ((length(v#6) <= 3)) v#6 else if ((length(rtrim(v#6, None)) > 3)) cast(raise_error(concat(input string of length , cast(length(v#6) as string),  exceeds varchar type length limitation: 3)) as string) else rpad(rtrim(v#6, None), 3,  ) AS v#14, i#7]
    [info]               +- RelationV2[v#6, i#7, index#15, _partition#16] testcat.t1
    [info]
    [info]   Project [v#14, sum(i)#11L]
    [info]   +- Sort [aggOrder#12 ASC NULLS FIRST], true
    [info]      +- !Aggregate [v#14], [v#14, sum(cast(i#7 as bigint)) AS sum(i)#11L, v#13 AS aggOrder#12]
    [info]         +- SubqueryAlias testcat.t1
    [info]            +- Project [if ((length(v#6) <= 3)) v#6 else if ((length(rtrim(v#6, None)) > 3)) cast(raise_error(concat(input string of length , cast(length(v#6) as string),  exceeds varchar type length limitation: 3)) as string) else rpad(rtrim(v#6, None), 3,  ) AS v#14, i#7]
    [info]               +- RelationV2[v#6, i#7, index#15, _partition#16] testcat.t1
    ```
    
    ### Why are the changes needed?
    
    bugfix
    ### Does this PR introduce _any_ user-facing change?
    
    no
    ### How was this patch tested?
    
    new tests
    
    Closes #31027 from yaooqinn/SPARK-34003.
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 0f8e5dd445b03161a27893ba714db57919d8bcab)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala  | 14 ++++++++++----
 .../scala/org/apache/spark/sql/CharVarcharTestSuite.scala  |  8 ++++++++
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d138ff3..7a3b8b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2323,16 +2323,22 @@ class Analyzer(override val catalogManager: CatalogManager)
           // to push down this ordering expression and can reference the original aggregate
           // expression instead.
           val needsPushDown = ArrayBuffer.empty[NamedExpression]
-          val evaluatedOrderings = resolvedAliasedOrdering.zip(unresolvedSortOrders).map {
-            case (evaluated, order) =>
+          val orderToAlias = unresolvedSortOrders.zip(aliasedOrdering)
+          val evaluatedOrderings = resolvedAliasedOrdering.zip(orderToAlias).map {
+            case (evaluated, (order, aliasOrder)) =>
               val index = originalAggExprs.indexWhere {
                 case Alias(child, _) => child semanticEquals evaluated.child
                 case other => other semanticEquals evaluated.child
               }
 
               if (index == -1) {
-                needsPushDown += evaluated
-                order.copy(child = evaluated.toAttribute)
+                if (CharVarcharUtils.getRawType(evaluated.metadata).nonEmpty) {
+                  needsPushDown += aliasOrder
+                  order.copy(child = aliasOrder)
+                } else {
+                  needsPushDown += evaluated
+                  order.copy(child = evaluated.toAttribute)
+                }
               } else {
                 order.copy(child = originalAggExprs(index).toAttribute)
               }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index d20cee0..fb35d6c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -466,6 +466,14 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
         Row("c"))
     }
   }
+
+  test("SPARK-34003: fix char/varchar fails w/ both group by and order by ") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(v VARCHAR(3), i INT) USING $format")
+      sql("INSERT INTO t VALUES ('c', 1)")
+      checkAnswer(sql("SELECT v, sum(i) FROM t GROUP BY v ORDER BY v"), Row("c", 1))
+    }
+  }
 }
 
 // Some basic char/varchar tests which doesn't rely on table implementation.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org