You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "L. C. Hsieh (Jira)" <ji...@apache.org> on 2021/09/08 01:20:00 UTC

[jira] [Resolved] (SPARK-36677) NestedColumnAliasing pushes down aggregate functions into projections

     [ https://issues.apache.org/jira/browse/SPARK-36677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

L. C. Hsieh resolved SPARK-36677.
---------------------------------
    Fix Version/s: 3.2.0
       Resolution: Fixed

Issue resolved by pull request 33921
[https://github.com/apache/spark/pull/33921]

> NestedColumnAliasing pushes down aggregate functions into projections
> ---------------------------------------------------------------------
>
>                 Key: SPARK-36677
>                 URL: https://issues.apache.org/jira/browse/SPARK-36677
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.2.0, 3.3.0
>            Reporter: Venkata Sai Akhil Gudesa
>            Assignee: Venkata Sai Akhil Gudesa
>            Priority: Major
>             Fix For: 3.2.0
>
>
> Aggregate functions are being pushed down into projections when nested columns are accessed causing the following error:
> {code:java}
> Caused by: UnsupportedOperationException: Cannot generate code for expression: ...{code}
> Reproduction:
>  
> {code:java}
> spark.sql("drop table if exists test_aggregates")
> spark.sql("create table if not exists test_aggregates(a STRUCT<c: STRUCT<e: string>, d: int>, b string)")
> val df = sql("select max(a).c.e from (select a, b from test_aggregates) group by b")
> println(df.queryExecution.optimizedPlan)
> {code}
>  
> The output of the above code:
> {noformat}
> 'Aggregate [b#1], [_extract_e#5 AS max(a).c.e#3]
> +- 'Project [max(a#0).c.e AS _extract_e#5, b#1]
>    +- Relation default.test_aggregates[a#0,b#1] parquet
> {noformat}
> The error message when the dataframe is executed:
> {noformat}
> java.lang.UnsupportedOperationException: Cannot generate code for expression: max(input[0, struct<c:struct<e:string>,d:int>, true])
>   at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotGenerateCodeForExpressionError(QueryExecutionErrors.scala:83)
>   at org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode(Expression.scala:312)
>   at org.apache.spark.sql.catalyst.expressions.Unevaluable.doGenCode$(Expression.scala:311)
>   at org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.doGenCode(interfaces.scala:99)
>   at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:151)
>   at scala.Option.getOrElse(Option.scala:189)
>   at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:146)
>   at org.apache.spark.sql.catalyst.expressions.UnaryExpression.nullSafeCodeGen(Expression.scala:525)
>   at org.apache.spark.sql.catalyst.expressions.GetStructField.doGenCode(complexTypeExtractors.scala:126)
>   at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:151)
>   at scala.Option.getOrElse(Option.scala:189)
>   at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:146)
>   at org.apache.spark.sql.catalyst.expressions.UnaryExpression.nullSafeCodeGen(Expression.scala:525)
>   at org.apache.spark.sql.catalyst.expressions.GetStructField.doGenCode(complexTypeExtractors.scala:126)
>   at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:151)
>   at scala.Option.getOrElse(Option.scala:189)
>   at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:146)
>   at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:171)
>   at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$2(basicPhysicalOperators.scala:73)
>   at scala.collection.immutable.List.map(List.scala:293)
>   at org.apache.spark.sql.execution.ProjectExec.$anonfun$doConsume$1(basicPhysicalOperators.scala:73)
>   at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.withSubExprEliminationExprs(CodeGenerator.scala:1039)
>   at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:73)
>   at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:195)
>   at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:150)
>   at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:497)
>   at org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:484)
>   at org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:457)
>   at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:497)
>   at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:96)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
>   at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:91)
>   at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:91)
>   at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:497)
>   at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:54)
>   at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:96)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
>   at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:91)
>   at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:91)
>   at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
>   at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:792)
>   at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:151)
>   at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:96)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
>   at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:91)
>   at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:91)
>   at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:46)
>   at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:659)
>   at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:722)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:184)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:180)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
>   at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
>   at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
>   at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
>   at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:172)
>   at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
>   at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:256)
>   at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:254)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>   at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:254)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
>   at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
>   at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
>   at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3742)
>   at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2998)
>   at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3733)
>   at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>   at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>   at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3731)
>   at org.apache.spark.sql.Dataset.collect(Dataset.scala:2998)
>   ... 47 elided
> {noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org