You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/13 11:58:35 UTC

[spark] branch branch-3.1 updated: [SPARK-35014] Fix the PhysicalAggregation pattern to not rewrite foldable expressions

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c71126a  [SPARK-35014] Fix the PhysicalAggregation pattern to not rewrite foldable expressions
c71126a is described below

commit c71126ad330b97edb5b1770e646c6a868358a19b
Author: Yingyi Bu <yi...@databricks.com>
AuthorDate: Tue Apr 13 19:57:13 2021 +0800

    [SPARK-35014] Fix the PhysicalAggregation pattern to not rewrite foldable expressions
    
    ### What changes were proposed in this pull request?
    
    Fix PhysicalAggregation to not transform a foldable expression.
    
    ### Why are the changes needed?
    
    It can potentially break certain queries like the added unit test shows.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it fixes undesirable errors caused by a returned TypeCheckFailure from places like RegExpReplace.checkInputDataTypes.
    
    Closes #32113 from sigmod/foldable.
    
    Authored-by: Yingyi Bu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 9cd25b46b9d1de0c7cdecdabd8cf37b25ec2d78a)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/planning/patterns.scala     |  2 +-
 .../catalyst/util/PhysicalAggregationSuite.scala   | 54 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 2880e87..c22a874 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -333,7 +333,7 @@ object PhysicalAggregation {
           case ue: PythonUDF if PythonUDF.isGroupedAggPandasUDF(ue) =>
             equivalentAggregateExpressions.getEquivalentExprs(ue).headOption
               .getOrElse(ue).asInstanceOf[PythonUDF].resultAttribute
-          case expression =>
+          case expression if !expression.foldable =>
             // Since we're using `namedGroupingAttributes` to extract the grouping key
             // columns, we need to replace grouping key expressions with their corresponding
             // attributes. We do not rely on the equality check at here since attributes may
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/PhysicalAggregationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/PhysicalAggregationSuite.scala
new file mode 100644
index 0000000..b8c60df
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/PhysicalAggregationSuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalAggregation
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+
+class PhysicalAggregationSuite extends PlanTest {
+  val testRelation = LocalRelation('a.int, 'b.int)
+
+  test("SPARK-35014: a foldable expression should not be replaced by an AttributeReference") {
+    val query = testRelation
+      .groupBy('a, Literal.create(1) as 'k)(
+        'a, Round(Literal.create(1.2), Literal.create(1)) as 'r, count('b) as 'c)
+    val analyzedQuery = SimpleAnalyzer.execute(query)
+
+    val PhysicalAggregation(
+      groupingExpressions,
+      aggregateExpressions,
+      resultExpressions,
+      _ /* child */
+    ) = analyzedQuery
+
+    assertResult(2)(groupingExpressions.length)
+    assertResult(1)(aggregateExpressions.length)
+    assertResult(3)(resultExpressions.length)
+
+    // Verify that Round's scale parameter is a Literal.
+    resultExpressions(1) match {
+      case Alias(Round(_, _: Literal), _) =>
+      case other => fail("unexpected result expression: " + other)
+    }
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org