You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yifei Huang (JIRA)" <ji...@apache.org> on 2019/04/18 22:58:00 UTC

[jira] [Created] (SPARK-27514) Empty window expression results in error in optimizer

Yifei Huang created SPARK-27514:
-----------------------------------

             Summary: Empty window expression results in error in optimizer
                 Key: SPARK-27514
                 URL: https://issues.apache.org/jira/browse/SPARK-27514
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.0.0
            Reporter: Yifei Huang


Currently, the optimizer will break on the following code:
{code:java}
val schema = StructType(Seq(
  StructField("colA", StringType, true),
  StructField("colB", IntegerType, true)
))

var df = sqlContext.sparkSession.createDataFrame(new util.ArrayList[Row](), schema)
val w = Window.partitionBy("colA")
df = df.withColumn("col1", sum("colB").over(w))
df = df.withColumn("col3", sum("colB").over(w))
df = df.withColumn("col4", sum("col3").over(w))
df = df.withColumn("col2", sum("col1").over(w))
df = df.select("col2")
df.explain(true)
{code}
with the following stacktrace:
{code:java}
next on empty iterator
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)
at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)
at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$$anonfun$apply$15.applyOrElse(Optimizer.scala:803)
at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$$anonfun$apply$15.applyOrElse(Optimizer.scala:798)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:282)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:282)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:281)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:191)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:328)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:279)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$.apply(Optimizer.scala:798)
at org.apache.spark.sql.catalyst.optimizer.CollapseWindow$.apply(Optimizer.scala:797)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:109)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:106)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:106)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:98)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:98)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$executeAndTrack$1.apply(RuleExecutor.scala:77)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$executeAndTrack$1.apply(RuleExecutor.scala:77)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:76)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$optimizedPlan$1.apply(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$optimizedPlan$1.apply(QueryExecution.scala:74)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$org$apache$spark$sql$execution$QueryExecution$$writePlans$3.apply(QueryExecution.scala:135)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$org$apache$spark$sql$execution$QueryExecution$$writePlans$3.apply(QueryExecution.scala:135)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:316)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:135)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:142)
at org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:168)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.explain(Dataset.scala:492)
at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2$Tests$1.test(TestSparkSessionSuite.scala:51)
at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply$mcV$sp(TestSparkSessionSuite.scala:56)
at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply(TestSparkSessionSuite.scala:35)
at org.apache.spark.sql.test.TestSparkSessionSuite$$anonfun$2.apply(TestSparkSessionSuite.scala:35)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:53)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:53)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28){code}
We believe this has to do with the removal of this [https://github.com/apache/spark/pull/23343/files#diff-a636a87d8843eeccca90140be91d4fafL606] from the ColumnPruning rule to the RemoveNoopOperators rule, which gets applied later. This means that you're not removing empty Window expression nodes before you run the CollapseWindow rule, which would result in empty window expressions when applying the CollapseWindow rule.

There's a few ways we could go about solving this:
 # Check for whether the windowExpressions is empty in the CollapseWindow rule. If it's empty, don't collapse the windows, and let the later RemoveNoopOperators rule remove it.
 # Add the removal of empty window expressions back into ColumnPruning
 # Move the RemoveNoopOperators rule after ColumnPruning and before CollapseWindow.

#2 feels like a hack, and I'm don't have enough familiarity with the Optimizer to understand all the implications of reordering rules.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org