You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lukas Grasmann (Jira)" <ji...@apache.org> on 2022/04/26 08:44:00 UTC

[jira] [Updated] (SPARK-39022) Spark SQL - Combination of HAVING and SORT not resolved correctly

     [ https://issues.apache.org/jira/browse/SPARK-39022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lukas Grasmann updated SPARK-39022:
-----------------------------------
    Attachment: explain_new.txt
                explain_old.txt

> Spark SQL - Combination of HAVING and SORT not resolved correctly
> -----------------------------------------------------------------
>
>                 Key: SPARK-39022
>                 URL: https://issues.apache.org/jira/browse/SPARK-39022
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2, 3.2.1, 3.4
>            Reporter: Lukas Grasmann
>            Priority: Major
>         Attachments: explain_new.txt, explain_old.txt
>
>
> h1. Spark SQL - Combination of HAVING and SORT not resolved correctly
> Example: Given a simple relation {{test}} with two relevant columns {{hotel}} and {{price}} where {{hotel}} is a unique identifier of a hotel and {{price}} is the cost of a night's stay. We would then like to order the {{{}hotel{}}}s by their cumulative prices but only for hotels where the cumulative price is higher than {{{}150{}}}.
> h2. Current Behavior
> To achieve the goal specified above, we give a simple query that works in most common database systems. Note that we only retrieve {{hotel}} in the {{SELECT ... FROM}} statement which means that the aggregate has to be removed from the result attributes using a {{Project}} node.
> {code:scala}
> sqlcontext.sql("SELECT hotel FROM test GROUP BY hotel HAVING sum(price) > 150 ORDER BY sum(price)").show{code}
> Currently, this yields an {{AnalysisException}} since the aggregate {{sum(price)}} in {{Sort}} is not resolved correctly. Note that the child of {{Sort}} is a (premature) {{Project}} node which only provides {{hotel}} as its output. This prevents the aggregate values from being passed to {{{}Sort{}}}.
> {code:scala}
> org.apache.spark.sql.AnalysisException: Column 'price' does not exist. Did you mean one of the following? [test.hotel]; line 1 pos 75;
> 'Sort ['sum('price) ASC NULLS FIRST], true
> +- Project [hotel#17]
>    +- Filter (sum(cast(price#18 as double))#22 > cast(150 as double))
>       +- Aggregate [HOTEL#17], [hotel#17, sum(cast(price#18 as double)) AS sum(cast(price#18 as double))#22]
>          +- SubqueryAlias test
>             +- View (`test`, [hotel#17,price#18])
>                +- Relation [hotel#17,price#18] csv
> {code}
> The {{AnalysisException}} itself, however, is not caused by the introduced {{Project}} as can be seen in the following example. Here, {{sum(price)}} is part of the result and therefore *not* removed using a {{Project}} node.
> {code:scala}
> sqlcontext.sql("SELECT hotel, sum(price) FROM test GROUP BY hotel HAVING sum(price) > 150 ORDER BY sum(price)").show{code}
> Resolving the aggregate {{sum(price)}} (i.e., resolving it to the aggregate introduced by the {{Aggregate}} node) is still not successful even if there is no {{{}Project{}}}. Spark still throws the following {{AnalysisException}} which is similar to the exception from before. It follows that there is a second error in the analyzer that still prevents successful resolution even if the problem regarding the {{Project}} node is fixed.
> {code:scala}
> org.apache.spark.sql.AnalysisException: Column 'price' does not exist. Did you mean one of the following? [sum(price), test.hotel]; line 1 pos 87;
> 'Sort ['sum('price) ASC NULLS FIRST], true
> +- Filter (sum(price)#24 > cast(150 as double))
>    +- Aggregate [HOTEL#17], [hotel#17, sum(cast(price#18 as double)) AS sum(price)#24]
>       +- SubqueryAlias test
>          +- View (`test`, [hotel#17,price#18])
>             +- Relation [hotel#17,price#18] csv
> {code}
>  
> This error occurs (at least) in Spark versions 3.1.2, 3.2.1, as well as the latest version from the GitHub {{master}} branch.
> h2. Current Workaround
> The issue can currently be worked around by using a subquery to first retrieve only the hotels which fulfill the condition and then ordering them in the outer query:
> {code:sql}
> SELECT hotel, sum_price FROM
>     (SELECT hotel, sum(price) AS sum_price FROM test GROUP BY hotel HAVING sum(price) > 150) sub
> ORDER BY sum_price;
> {code}
> h2. Proposed Solution(s)
> The first change fixes the (premature) insertion of {{Project}} before a {{Sort}} by moving the {{Project}} up in the plan such that the {{Project}} is then parent of the {{Sort}} instead of vice versa. This does not change the results of the computations since both {{Sort}} and {{Project}} do not add or remove tuples from the result.
> There are two potential side-effects to this solution:
>  * May change some plans generated by DataFrame/DataSet which previously also produced similar errors such that they now yield a result instead. However, this is unlikely to produce unexpected/undesired results (see above).
>  * Moving the projection might reduce performance for {{Sort}} since the input is potentially bigger.
> {code:scala}
> object PreventPrematureProjections extends Rule[LogicalPlan] {
>     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
>         case sort@Sort(_, _,
>             project@Project(_,
>                 filter@Filter(_,
>                     aggregate: Aggregate
>                 )
>             )
>         ) =>
>         
>         project.copy(
>             child = sort.copy(
>                 child = filter.copy(
>                     child = aggregate
>                 )
>             )
>         )
>     }
> }
> {code}
>  
> To solve the second problem with aggregates not being resolved, we introduce a new case for {{ResolveAggregateFunctions}} as the second change. The newly introduced code is similar to the cases already in place. Here, we ensure that aggregate functions of the plan can also be resolved if there is a {{Filter}} (introduced by resolving an {{UnresolvedHaving}} node generated by parsing the original query) "between" the {{Sort}} and the {{Aggregate}} nodes.
> {code:scala}
> case Sort(sortOrder, global, filter@Filter(_, agg: Aggregate)) if agg.resolved =>
>     val maybeResolved = sortOrder.map(_.child).map(resolveExpressionByPlanOutput(_, agg))
>     resolveOperatorWithAggregate(maybeResolved, agg, (newExprs, newChild) => {
>         val newSortOrder = sortOrder.zip(newExprs).map {
>         case (sortOrder, expr) => sortOrder.copy(child = expr)
>         }
>         Sort(newSortOrder, global, filter.copy(child = newChild))
>     })
> {code}
> h2. Changed Behavior
> The behavior of one of the TCPDS v2.7 plan stability suite tests changed. For {{{}tpcds-v2.7.0/q6.sql{}}}, the resolved plan is now slightly different since the resolution has changed. This should, however, not affect the results of the query as it consists of only two attributes ({{{}state{}}} and {{{}cnt{}}}).
>  
> Old {{{}explain.txt{}}}:
>  
> {code:java}
> (37) HashAggregate [codegen id : 8]
> Input [2]: [ca_state#2, count#27]
> Keys [1]: [ca_state#2]
> Functions [1]: [count(1)]
> Aggregate Attributes [1]: [count(1)#29]
> Results [3]: [ca_state#2 AS state#30, count(1)#29 AS cnt#31, ca_state#2]
> (38) Filter [codegen id : 8]
> Input [3]: [state#30, cnt#31, ca_state#2]
> Condition : (cnt#31 >= 10)
> (39) TakeOrderedAndProject
> Input [3]: [state#30, cnt#31, ca_state#2]
> Arguments: 100, [cnt#31 ASC NULLS FIRST, ca_state#2 ASC NULLS FIRST], [state#30, cnt#31]{code}
>  
> New {{{}explain.txt{}}}:
>  
> {code:java}
> (37) HashAggregate [codegen id : 8]
> Input [2]: [ca_state#2, count#27]
> Keys [1]: [ca_state#2]
> Functions [1]: [count(1)]
> Aggregate Attributes [1]: [count(1)#29]
> Results [2]: [ca_state#2 AS state#30, count(1)#29 AS cnt#31]
> (38) Filter [codegen id : 8]
> Input [2]: [state#30, cnt#31]
> Condition : (cnt#31 >= 10)
> (39) TakeOrderedAndProject
> Input [2]: [state#30, cnt#31]
> Arguments: 100, [cnt#31 ASC NULLS FIRST, state#30 ASC NULLS FIRST], [state#30, cnt#31]{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org