You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/08 09:06:43 UTC
[spark] branch branch-3.1 updated: [SPARK-34003][SQL] Fix Rule
conflicts between PaddingAndLengthCheckForCharVarchar and
ResolveAggregateFunctions
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new c8bf22e [SPARK-34003][SQL] Fix Rule conflicts between PaddingAndLengthCheckForCharVarchar and ResolveAggregateFunctions
c8bf22e is described below
commit c8bf22e9f7b73cd6f8873c8ea466c59361335ae5
Author: Kent Yao <ya...@apache.org>
AuthorDate: Fri Jan 8 09:05:22 2021 +0000
[SPARK-34003][SQL] Fix Rule conflicts between PaddingAndLengthCheckForCharVarchar and ResolveAggregateFunctions
### What changes were proposed in this pull request?
ResolveAggregateFunctions is a hacky rule and it calls `executeSameContext` to generate a `resolved agg` to determine which unresolved sort attribute should be pushed into the agg. However, after we add the PaddingAndLengthCheckForCharVarchar rule which will rewrite the query output, thus, the `resolved agg` cannot match original attributes anymore.
It causes some dissociative sort attribute to be pushed in and fails the query
``` logtalk
[info] Failed to analyze query: org.apache.spark.sql.AnalysisException: expression 'testcat.t1.`v`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
[info] Project [v#14, sum(i)#11L]
[info] +- Sort [aggOrder#12 ASC NULLS FIRST], true
[info] +- !Aggregate [v#14], [v#14, sum(cast(i#7 as bigint)) AS sum(i)#11L, v#13 AS aggOrder#12]
[info] +- SubqueryAlias testcat.t1
[info] +- Project [if ((length(v#6) <= 3)) v#6 else if ((length(rtrim(v#6, None)) > 3)) cast(raise_error(concat(input string of length , cast(length(v#6) as string), exceeds varchar type length limitation: 3)) as string) else rpad(rtrim(v#6, None), 3, ) AS v#14, i#7]
[info] +- RelationV2[v#6, i#7, index#15, _partition#16] testcat.t1
[info]
[info] Project [v#14, sum(i)#11L]
[info] +- Sort [aggOrder#12 ASC NULLS FIRST], true
[info] +- !Aggregate [v#14], [v#14, sum(cast(i#7 as bigint)) AS sum(i)#11L, v#13 AS aggOrder#12]
[info] +- SubqueryAlias testcat.t1
[info] +- Project [if ((length(v#6) <= 3)) v#6 else if ((length(rtrim(v#6, None)) > 3)) cast(raise_error(concat(input string of length , cast(length(v#6) as string), exceeds varchar type length limitation: 3)) as string) else rpad(rtrim(v#6, None), 3, ) AS v#14, i#7]
[info] +- RelationV2[v#6, i#7, index#15, _partition#16] testcat.t1
```
### Why are the changes needed?
bugfix
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
new tests
Closes #31027 from yaooqinn/SPARK-34003.
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 0f8e5dd445b03161a27893ba714db57919d8bcab)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 14 ++++++++++----
.../scala/org/apache/spark/sql/CharVarcharTestSuite.scala | 8 ++++++++
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d138ff3..7a3b8b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2323,16 +2323,22 @@ class Analyzer(override val catalogManager: CatalogManager)
// to push down this ordering expression and can reference the original aggregate
// expression instead.
val needsPushDown = ArrayBuffer.empty[NamedExpression]
- val evaluatedOrderings = resolvedAliasedOrdering.zip(unresolvedSortOrders).map {
- case (evaluated, order) =>
+ val orderToAlias = unresolvedSortOrders.zip(aliasedOrdering)
+ val evaluatedOrderings = resolvedAliasedOrdering.zip(orderToAlias).map {
+ case (evaluated, (order, aliasOrder)) =>
val index = originalAggExprs.indexWhere {
case Alias(child, _) => child semanticEquals evaluated.child
case other => other semanticEquals evaluated.child
}
if (index == -1) {
- needsPushDown += evaluated
- order.copy(child = evaluated.toAttribute)
+ if (CharVarcharUtils.getRawType(evaluated.metadata).nonEmpty) {
+ needsPushDown += aliasOrder
+ order.copy(child = aliasOrder)
+ } else {
+ needsPushDown += evaluated
+ order.copy(child = evaluated.toAttribute)
+ }
} else {
order.copy(child = originalAggExprs(index).toAttribute)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index d20cee0..fb35d6c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -466,6 +466,14 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
Row("c"))
}
}
+
+ test("SPARK-34003: fix char/varchar fails w/ both group by and order by ") {
+ withTable("t") {
+ sql(s"CREATE TABLE t(v VARCHAR(3), i INT) USING $format")
+ sql("INSERT INTO t VALUES ('c', 1)")
+ checkAnswer(sql("SELECT v, sum(i) FROM t GROUP BY v ORDER BY v"), Row("c", 1))
+ }
+ }
}
// Some basic char/varchar tests which doesn't rely on table implementation.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org