You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (JIRA)" <ji...@apache.org> on 2019/04/19 06:07:00 UTC

[jira] [Resolved] (SPARK-27514) Empty window expression results in error in optimizer

     [ https://issues.apache.org/jira/browse/SPARK-27514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan resolved SPARK-27514.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 3.0.0

Issue resolved by pull request 24411
[https://github.com/apache/spark/pull/24411]

> Empty window expression results in error in optimizer
> -----------------------------------------------------
>
>                 Key: SPARK-27514
>                 URL: https://issues.apache.org/jira/browse/SPARK-27514
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Yifei Huang
>            Assignee: Yifei Huang
>            Priority: Major
>             Fix For: 3.0.0
>
>
> Currently, the optimizer will break on the following code:
> {code:java}
> val schema = StructType(Seq(
>   StructField("colA", StringType, true),
>   StructField("colB", IntegerType, true)
> ))
> var df = sqlContext.sparkSession.createDataFrame(new util.ArrayList[Row](), schema)
> val w = Window.partitionBy("colA")
> df = df.withColumn("col1", sum("colB").over(w))
> df = df.withColumn("col3", sum("colB").over(w))
> df = df.withColumn("col4", sum("col3").over(w))
> df = df.withColumn("col2", sum("col1").over(w))
> df = df.select("col2")
> df.explain(true)
> {code}
> with the following stacktrace:
> {code:java}
> next on empty iterator
> java.util.NoSuchElementException: next on empty iterator
> at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
> at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
> at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
> at scala.collection.IterableLike$class.head(IterableLike.scala:107)
> at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)
> at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
> at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)
> at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$$anonfun$apply$15.applyOrElse(Optimizer.scala:803)
> at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$$anonfun$apply$15.applyOrElse(Optimizer.scala:798)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:282)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:282)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:281)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
> at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$.apply(Optimizer.scala:798)
> at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$.apply(Optimizer.scala:797)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:109)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:106)
> at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
> at scala.collection.immutable.List.foldLeft(List.scala:84)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:106)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:98)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:98)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$executeAndTrack$1.apply(RuleExecutor.scala:77)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$executeAndTrack$1.apply(RuleExecutor.scala:77)
> at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:76)
> at org.apache.spark.sql.execution.QueryExecution$$anonfun$optimizedPlan$1.apply(QueryExecution.scala:74)
> at org.apache.spark.sql.execution.QueryExecution$$anonfun$optimizedPlan$1.apply(QueryExecution.scala:74)
> at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
> at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
> at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
> at org.apache.spark.sql.execution.QueryExecution$$anonfun$org$apache$spark$sql$execution$QueryExecution$$writePlans$3.apply(QueryExecution.scala:135)
> at org.apache.spark.sql.execution.QueryExecution$$anonfun$org$apache$spark$sql$execution$QueryExecution$$writePlans$3.apply(QueryExecution.scala:135)
> at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:316)
> at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:135)
> at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:142)
> at org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:168)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset.explain(Dataset.scala:492)
> at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2$Tests$1.test(TestSparkSessionSuite.scala:51)
> at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply$mcV$sp(TestSparkSessionSuite.scala:56)
> at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply(TestSparkSessionSuite.scala:35)
> at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply(TestSparkSessionSuite.scala:35)
> at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> at org.scalatest.Transformer.apply(Transformer.scala:22)
> at org.scalatest.Transformer.apply(Transformer.scala:20)
> at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
> at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
> at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
> at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
> at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
> at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
> at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
> at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
> at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
> at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
> at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
> at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
> at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
> at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
> at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
> at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
> at org.scalatest.Suite$class.run(Suite.scala:1147)
> at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
> at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
> at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
> at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
> at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
> at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:53)
> at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
> at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
> at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:53)
> at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
> at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
> at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
> at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
> at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
> at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
> at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
> at org.scalatest.tools.Runner$.run(Runner.scala:850)
> at org.scalatest.tools.Runner.run(Runner.scala)
> at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
> at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28){code}
> We believe this has to do with the removal of this [https://github.com/apache/spark/pull/23343/files#diff-a636a87d8843eeccca90140be91d4fafL606] from the ColumnPruning rule to the RemoveNoopOperators rule, which gets applied later. This means that you're not removing empty Window expression nodes before you run the CollapseWindow rule, which would result in empty window expressions when applying the CollapseWindow rule.
> There's a few ways we could go about solving this:
>  # Check for whether the windowExpressions is empty in the CollapseWindow rule. If it's empty, don't collapse the windows, and let the later RemoveNoopOperators rule remove it.
>  # Add the removal of empty window expressions back into ColumnPruning
>  # Move the RemoveNoopOperators rule after ColumnPruning and before CollapseWindow.
> #2 feels like a hack, and I'm don't have enough familiarity with the Optimizer to understand all the implications of reordering rules.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org