You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "JacobZheng (Jira)" <ji...@apache.org> on 2022/12/01 05:52:00 UTC

[jira] [Commented] (SPARK-41336) BroadcastExchange does not support the execute() code path. when AQE enabled

    [ https://issues.apache.org/jira/browse/SPARK-41336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641679#comment-17641679 ] 

JacobZheng commented on SPARK-41336:
------------------------------------

This case runs successfully on spark3.3.1. I will check the differences between the two versions. [~yumwang]

> BroadcastExchange does not support the execute() code path. when AQE enabled
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-41336
>                 URL: https://issues.apache.org/jira/browse/SPARK-41336
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: JacobZheng
>            Priority: Major
>
> I am getting an error when running the following code.
> {code:java}
> val df1 = spark.read.format("delta").load("/table/n4bee1a51083e49e6adacf2a").selectExpr("ID","TITLE")
> val df2 = spark.read.format("delta").load("/table/db8e1ef7f0fdb447d8aae2e7").selectExpr("ID","STATUS").filter("STATUS == 3")
> val df3 = spark.read.format("delta").load("/table/q56719945d2534c9c88eb669").selectExpr("EMPNO","TITLE","LEAVEID","WFINSTANCEID","SUBMIT1").filter("SUBMIT1 == 1")
> val df4 = spark.read.format("delta").load("/table/pd39b547fb6c24382861af92").selectExpr("`年月`")
> val jr1 = df3.join(df2,df3("WFINSTANCEID")===df2("ID"),"inner").select(df3("EMPNO").as("NEWEMPNO"),df3("TITLE").as("NEWTITLE"),df3("LEAVEID"))
> val jr2 = jr1.join(df1,jr1("LEAVEID")===df1("ID"),"LEFT_OUTER").select(jr1("NEWEMPNO"),jr1("NEWTITLE"),df1("TITLE").as("TYPE"))
> val gr1 = jr2.groupBy(jr2("NEWEMPNO").as("EMPNO__0"),jr2("TYPE").as("TYPE__1")).agg(Map.empty[String,String]).toDF("EMPNO","TYPE")
> val temp1 = gr1.selectExpr("*","9 as KEY")
> val temp2 = df4.selectExpr("*","9 as KEY")
> val jr3 = temp1.join(temp2,temp1("KEY")===temp2("KEY"),"OUTER").select(temp1("EMPNO"),temp1("TYPE"),temp1("KEY"),temp2("`年月`"))
> jr3.show(200)
> {code}
> The error message is as follows
> {code:java}
> java.lang.UnsupportedOperationException: BroadcastExchange does not support the execute() code path.
>   at org.apache.spark.sql.errors.QueryExecutionErrors$.executeCodePathUnsupportedError(QueryExecutionErrors.scala:1655)
>   at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecute(BroadcastExchangeExec.scala:203)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:184)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:180)
>   at org.apache.spark.sql.execution.adaptive.QueryStageExec.doExecute(QueryStageExec.scala:119)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:184)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:180)
>   at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:526)
>   at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:454)
>   at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:453)
>   at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:497)
>   at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:50)
>   at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:50)
>   at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:750)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:184)
>   at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:180)
>   at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:325)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:443)
>   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429)
>   at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
>   at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:338)
>   at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:366)
>   at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
>   at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
>   at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
>   at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
>   at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>   at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>   at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
>   at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:806)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:765)
> {code}
> When I switch to version 3.0.1 or set spark.sql.adaptive.enabled=false, the code runs successfully.
> {code:java}
> +- ShuffleQueryStage 3
>    *(4) LocalLimit 200
>    +- *(4) Project [cast(null as string) AS EMPNO#8304, cast(null as string) AS TYPE#8319, 年月#8338, KEY#8341]
>       +- BroadcastQueryStage 2
>          +- BroadcastExchange IdentityBroadcastMode, [id=#9619]
>             +- *(3) Project [年月#8338, 10 AS KEY#8341]
>                +- *(3) ColumnarToRow
>                   +- FileScan parquet [年月#8338] Batched: true, DataFilters: [], Format: Parquet, Location: TahoeLogFileIndex(1 paths)[/table/pd39b547fb6c24382861af92], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<年月:string>
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org