You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nicholas Chammas (Jira)" <ji...@apache.org> on 2022/04/26 15:45:00 UTC

[jira] [Comment Edited] (SPARK-37222) Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures

    [ https://issues.apache.org/jira/browse/SPARK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528233#comment-17528233 ] 

Nicholas Chammas edited comment on SPARK-37222 at 4/26/22 3:44 PM:
-------------------------------------------------------------------

I've found a helpful log setting that causes Spark to print out detailed information about how exactly a plan is transformed during optimization:
{code:java}
spark.conf.set("spark.sql.planChangeLog.level", "warn") {code}
Here's the log generated by enabling this setting and running Shawn's example: [^plan-log.log]

To confirm what Shawn noted in his comment above, it looks like the chain of events that results in a loop is as follows:
 # ColumnPruning
 # FoldablePropagation _<loop starts here>_
 # RemoveNoopOperators
 # PushDownLeftSemiAntiJoin
 # ColumnPruning
 # CollapseProject
 # _<back to the start of the loop with FoldablePropagation>_

What seems to be the problem is that ColumnPruning inserts some Project operators which are then removed successively by CollapseProject, RemoveNoopOperators, and PushDownLeftSemiAntiJoin.

These rules go back and forth, undoing each other's work, until {{spark.sql.optimizer.maxIterations}} is exhausted.


was (Author: nchammas):
I've found a helpful log setting that causes Spark to print out detailed information about how exactly a plan is transformed during optimization:
{code:java}
spark.conf.set("spark.sql.planChangeLog.level", "warn") {code}
Here's the log generated by enabling this setting and running Shawn's example: [^plan-log.log]

To confirm what Shawn noted in his comment above, it looks like the chain of events that results in a loop is as follows:
 # PushDownLeftSemiAntiJoin
 # ColumnPruning
 # CollapseProject
 # FoldablePropagation
 # RemoveNoopOperators
 # <back to the start of the loop with PushDownLeftSemiAntiJoin>

What seems to be the problem is that:
 * ColumnPruning inserts a couple of Project operators which are then removed by CollapseProject.
 * CollapseProject in turn pushes up the left anti-join which is then pushed down again by PushDownLeftSemiAntiJoin.

These three rules go back and forth, undoing each other's work, until {{spark.sql.optimizer.maxIterations}} is exhausted.

> Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures
> ---------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37222
>                 URL: https://issues.apache.org/jira/browse/SPARK-37222
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 3.1.2, 3.2.0, 3.2.1
>         Environment: I've reproduced the error on Spark 3.1.2, 3.2.0, and with the current branch-3.2 HEAD (git commit 966c90c0b5) as of November 5, 2021.
> The problem does not occur with Spark 3.0.1.
>  
>            Reporter: Shawn Smith
>            Priority: Major
>         Attachments: plan-log.log
>
>
> The query optimizer never reaches a fixed point when optimizing the query below. This manifests as a warning:
> > WARN: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
> But the suggested fix won't help. The actual problem is that the optimizer fails to make progress on each iteration and gets stuck in a loop.
> In practice, Spark logs a warning but continues on and appears to execute the query successfully, albeit perhaps sub-optimally.
> To reproduce, paste the following into the Spark shell. With Spark 3.1.2 and 3.2.0 but not 3.0.1 it will throw an exception:
> {noformat}
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
>     .select('id, 'nested("n"))
>     .explain()
> }
> {noformat}
> Looking at the query plan as the optimizer iterates in {{RuleExecutor.execute()}}, here's an example of the plan after 49 iterations:
> {noformat}
> Project [id#2, _gen_alias_108#108L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>    :- Project [id#2, nested#3.n AS _gen_alias_108#108L]
>    :  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, deserialized, 1 replicas)
>    :        +- LocalTableScan <empty>, [id#2, nested#3]
>    +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 replicas)
>          +- LocalTableScan <empty>, [id#18]
> {noformat}
> And here's the plan after one more iteration. You can see that all that has changed is new aliases for the column in the nested column: "{{_gen_alias_108#108L}}" to "{{_gen_alias_109#109L}}".
> {noformat}
> Project [id#2, _gen_alias_109#109L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>    :- Project [id#2, nested#3.n AS _gen_alias_109#109L]
>    :  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, deserialized, 1 replicas)
>    :        +- LocalTableScan <empty>, [id#2, nested#3]
>    +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 replicas)
>          +- LocalTableScan <empty>, [id#18]
> {noformat}
> The optimizer continues looping and tweaking the alias until it hits the max iteration count and bails out.
> Here's an example that includes a stack trace:
> {noformat}
> $ bin/spark-shell
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
>       /_/
> Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
>     .select('id, 'nested("n"))
>     .explain()
> }
> // Exiting paste mode, now interpreting.
> java.lang.RuntimeException: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:246)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
>   at scala.collection.immutable.List.foreach(List.scala:431)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
>   at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:138)
>   at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
>   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:134)
>   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
>   at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
>   at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:220)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:600)
>   at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:220)
>   at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:247)
>   at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
>   at org.apache.spark.sql.Dataset.$anonfun$explain$1(Dataset.scala:543)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at org.apache.spark.sql.Dataset.explain(Dataset.scala:543)
>   at org.apache.spark.sql.Dataset.explain(Dataset.scala:567)
>   ... 47 elided
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org