You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "gaoyajun02 (Jira)" <ji...@apache.org> on 2021/07/29 04:52:00 UTC

[jira] [Created] (SPARK-36339) aggsBuffer should collect AggregateExpression in the map range

gaoyajun02 created SPARK-36339:
----------------------------------

             Summary: aggsBuffer should collect AggregateExpression in the map range
                 Key: SPARK-36339
                 URL: https://issues.apache.org/jira/browse/SPARK-36339
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.1.2, 3.0.3, 2.4.8
            Reporter: gaoyajun02


show demo for this ISSUE:

This SQL 

This SQL can be executed normally

 
{code:java}
// SQL without error

SELECT name, count(name) c
FROM VALUES ('Alice'), ('Bob') people(name)
GROUP BY name GROUPING SETS(name);

// An error is reported after exchanging the order of the query columns:

SELECT count(name) c, name
FROM VALUES ('Alice'), ('Bob') people(name)
GROUP BY name GROUPING SETS(name);

{code}
The error message is:

 
{code:java}

Error in query: expression 'people.`name`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [name#5, spark_grouping_id#3], [count(name#1) AS c#0L, name#1]
+- Expand [List(name#1, name#4, 0)], [name#1, name#5, spark_grouping_id#3]
   +- Project [name#1, name#1 AS name#4]
      +- SubqueryAlias `people`
         +- LocalRelation [name#1]

{code}
So far, I have checked that there is no problem before version 2.3. 

 

During debugging, I found that the behavior of constructAggregateExprs in ResolveGroupingAnalytics has changed. 
{code:java}
    /*
     * Construct new aggregate expressions by replacing grouping functions.
     */
    private def constructAggregateExprs(
        groupByExprs: Seq[Expression],
        aggregations: Seq[NamedExpression],
        groupByAliases: Seq[Alias],
        groupingAttrs: Seq[Expression],
        gid: Attribute): Seq[NamedExpression] = aggregations.map {
      // collect all the found AggregateExpression, so we can check an expression is part of
      // any AggregateExpression or not.
      val aggsBuffer = ArrayBuffer[Expression]()
      // Returns whether the expression belongs to any expressions in `aggsBuffer` or not.
      def isPartOfAggregation(e: Expression): Boolean = {
        aggsBuffer.exists(a => a.find(_ eq e).isDefined)
      }
      replaceGroupingFunc(_, groupByExprs, gid).transformDown {
        // AggregateExpression should be computed on the unmodified value of its argument
        // expressions, so we should not replace any references to grouping expression
        // inside it.
        case e: AggregateExpression =>
          aggsBuffer += e
          e
        case e if isPartOfAggregation(e) => e
        case e =>
          // Replace expression by expand output attribute.
          val index = groupByAliases.indexWhere(_.child.semanticEquals(e))
          if (index == -1) {
            e
          } else {
            groupingAttrs(index)
          }
      }.asInstanceOf[NamedExpression]
    }

{code}
When performing aggregations.map, the aggsBuffer here seems to be outside the scope of the map. It can store the AggregateExpression of all the elements processed by the map function, but this is not before 2.3.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org