You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shawn Smith (Jira)" <ji...@apache.org> on 2021/11/06 01:57:00 UTC

[jira] [Created] (SPARK-37222) Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures

Shawn Smith created SPARK-37222:
-----------------------------------

             Summary: Max iterations reached in Operator Optimization w/left_anti or left_semi join and nested structures
                 Key: SPARK-37222
                 URL: https://issues.apache.org/jira/browse/SPARK-37222
             Project: Spark
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 3.2.0, 3.1.2
         Environment: I've reproduced the error on Spark 3.1.2, 3.2.0, and with the current branch-3.2 HEAD (git commit 966c90c0b5) as of November 5, 2021.

The problem does not occur with Spark 3.0.1.

 
            Reporter: Shawn Smith


The query optimizer never reaches a fixed point when optimizing the query below. This manifests as a warning:

> WARN: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.

But the suggested fix won't help. The actual problem is that the optimizer fails to make progress on each iteration and gets stuck in a loop.

In practice, Spark logs a warning but continues on and appears to execute the query successfully, albeit perhaps sub-optimally.

To reproduce, paste the following into the Spark shell. With Spark 3.1.2 and 3.2.0 but not 3.0.1 it will throw an exception:
{noformat}
case class Nested(b: Boolean, n: Long)
case class Table(id: String, nested: Nested)
case class Identifier(id: String)

locally {
  System.setProperty("spark.testing", "true") // Fail instead of logging a warning
  val df = List.empty[Table].toDS.cache()
  val ids = List.empty[Identifier].toDS.cache()
  df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
    .select('id, 'nested("n"))
    .explain()
}
{noformat}
Looking at the query plan as the optimizer iterates in {{RuleExecutor.execute()}}, here's an example of the plan after 49 iterations:
{noformat}
Project [id#2, _gen_alias_108#108L AS nested.n#28L]
+- Join LeftAnti, (id#2 = id#18)
   :- Project [id#2, nested#3.n AS _gen_alias_108#108L]
   :  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, deserialized, 1 replicas)
   :        +- LocalTableScan <empty>, [id#2, nested#3]
   +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- LocalTableScan <empty>, [id#18]
{noformat}
And here's the plan after one more iteration. You can see that all that has changed is new aliases for the column in the nested column: "{{_gen_alias_108#108L}}" to "{{_gen_alias_109#109L}}".
{noformat}
Project [id#2, _gen_alias_109#109L AS nested.n#28L]
+- Join LeftAnti, (id#2 = id#18)
   :- Project [id#2, nested#3.n AS _gen_alias_109#109L]
   :  +- InMemoryRelation [id#2, nested#3], StorageLevel(disk, memory, deserialized, 1 replicas)
   :        +- LocalTableScan <empty>, [id#2, nested#3]
   +- InMemoryRelation [id#18], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- LocalTableScan <empty>, [id#18]
{noformat}
The optimizer continues looping and tweaking the alias until it hits the max iteration count and bails out.

Here's an example that includes a stack trace:
{noformat}
$ bin/spark-shell
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.12)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

case class Nested(b: Boolean, n: Long)
case class Table(id: String, nested: Nested)
case class Identifier(id: String)

locally {
  System.setProperty("spark.testing", "true") // Fail instead of logging a warning
  val df = List.empty[Table].toDS.cache()
  val ids = List.empty[Identifier].toDS.cache()
  df.join(ids, Seq("id"), "left_anti") // also fails with "left_semi"
    .select('id, 'nested("n"))
    .explain()
}

// Exiting paste mode, now interpreting.

java.lang.RuntimeException: Max iterations (100) reached for batch Operator Optimization before Inferring Filters, please set 'spark.sql.optimizer.maxIterations' to a larger value.
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:246)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
  at scala.collection.immutable.List.foreach(List.scala:431)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:138)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:134)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
  at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
  at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
  at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:220)
  at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:600)
  at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:220)
  at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:247)
  at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
  at org.apache.spark.sql.Dataset.$anonfun$explain$1(Dataset.scala:543)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.Dataset.explain(Dataset.scala:543)
  at org.apache.spark.sql.Dataset.explain(Dataset.scala:567)
  ... 47 elided
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org