You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nicholas Chammas (Jira)" <ji...@apache.org> on 2022/04/25 14:21:00 UTC

[jira] [Commented] (SPARK-37222) Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures

    [ https://issues.apache.org/jira/browse/SPARK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17527527#comment-17527527 ] 

Nicholas Chammas commented on SPARK-37222:
------------------------------------------

Thanks for the detailed report, [~ssmith]. I am hitting this issue as well on Spark 3.2.1, and your minimal test case also reproduces the issue for me.

How did you break down the optimization into its individual steps like that? That was very helpful.

I was able to use your breakdown to work around the issue by excluding {{{}PushDownLeftSemiAntiJoin{}}}:
{code:java}
spark.conf.set(
  "spark.sql.optimizer.excludedRules",
  "org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin"
){code}
If I run that before running the problematic query (including your test case), it seems to work around the issue.

> Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures
> ---------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37222
>                 URL: https://issues.apache.org/jira/browse/SPARK-37222
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 3.1.2, 3.2.0, 3.2.1
>         Environment: I've reproduced the error on Spark 3.1.2, 3.2.0, and with the current branch-3.2 HEAD (git commit 966c90c0b5) as of November 5, 2021.
> The problem does not occur with Spark 3.0.1.
>  
>            Reporter: Shawn Smith
>            Priority: Major
>
> The query optimizer never reaches a fixed point when optimizing the query below. This manifests as a warning:
> > WARN: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
> But the suggested fix won't help. The actual problem is that the optimizer fails to make progress on each iteration and gets stuck in a loop.
> In practice, Spark logs a warning but continues on and appears to execute the query successfully, albeit perhaps sub-optimally.
> To reproduce, paste the following into the Spark shell. With Spark 3.1.2 and 3.2.0 but not 3.0.1 it will throw an exception:
> {noformat}
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
>     .select('id, 'nested("n"))
>     .explain()
> }
> {noformat}
> Looking at the query plan as the optimizer iterates in {{RuleExecutor.execute()}}, here's an example of the plan after 49 iterations:
> {noformat}
> Project [id#2, _gen_alias_108#108L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>    :- Project [id#2, nested#3.n AS _gen_alias_108#108L]
>    :  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, deserialized, 1 replicas)
>    :        +- LocalTableScan <empty>, [id#2, nested#3]
>    +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 replicas)
>          +- LocalTableScan <empty>, [id#18]
> {noformat}
> And here's the plan after one more iteration. You can see that all that has changed is new aliases for the column in the nested column: "{{_gen_alias_108#108L}}" to "{{_gen_alias_109#109L}}".
> {noformat}
> Project [id#2, _gen_alias_109#109L AS nested.n#28L]
> +- Join LeftAnti, (id#2 = id#18)
>    :- Project [id#2, nested#3.n AS _gen_alias_109#109L]
>    :  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, deserialized, 1 replicas)
>    :        +- LocalTableScan <empty>, [id#2, nested#3]
>    +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 replicas)
>          +- LocalTableScan <empty>, [id#18]
> {noformat}
> The optimizer continues looping and tweaking the alias until it hits the max iteration count and bails out.
> Here's an example that includes a stack trace:
> {noformat}
> $ bin/spark-shell
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
>       /_/
> Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12)
> Type in expressions to have them evaluated.
> Type :help for more information.
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
> case class Nested(b: Boolean, n: Long)
> case class Table(id: String, nested: Nested)
> case class Identifier(id: String)
> locally {
>   System.setProperty("spark.testing", "true") // Fail instead of logging a warning
>   val df = List.empty[Table].toDS.cache()
>   val ids = List.empty[Identifier].toDS.cache()
>   df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
>     .select('id, 'nested("n"))
>     .explain()
> }
> // Exiting paste mode, now interpreting.
> java.lang.RuntimeException: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:246)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
>   at scala.collection.immutable.List.foreach(List.scala:431)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
>   at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
>   at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:138)
>   at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
>   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:134)
>   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
>   at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
>   at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
>   at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:220)
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:600)
>   at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:220)
>   at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:247)
>   at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
>   at org.apache.spark.sql.Dataset.$anonfun$explain$1(Dataset.scala:543)
>   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>   at org.apache.spark.sql.Dataset.explain(Dataset.scala:543)
>   at org.apache.spark.sql.Dataset.explain(Dataset.scala:567)
>   ... 47 elided
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org