You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2019/01/24 10:19:27 UTC

[spark] branch branch-2.4 updated: [SPARK-26680][SQL] Eagerly create inputVars while conditions are appropriate

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new e8e9b11  [SPARK-26680][SQL] Eagerly create inputVars while conditions are appropriate
e8e9b11 is described below

commit e8e9b110c7076139f551797ef7b74c7953b66f24
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Thu Jan 24 11:18:08 2019 +0100

    [SPARK-26680][SQL] Eagerly create inputVars while conditions are appropriate
    
    ## What changes were proposed in this pull request?
    
    When a user passes a Stream to groupBy, ```CodegenSupport.consume``` ends up lazily generating ```inputVars``` from a Stream, since the field ```output``` will be a Stream. At the time ```output.zipWithIndex.map``` is called, conditions are correct. However, by the time the map operation actually executes, conditions are no longer appropriate. The closure used by the map operation ends up using a reference to the partially created ```inputVars```. As a result, a StackOverflowError occurs.
    
    This PR ensures that ```inputVars``` is eagerly created while conditions are appropriate. It seems this was also an issue with the code path for creating ```inputVars``` from ```outputVars``` (SPARK-25767). I simply extended the solution for that code path to encompass both code paths.
    
    ## How was this patch tested?
    
    SQL unit tests
    new test
    python tests
    
    Closes #23617 from bersprockets/SPARK-26680_opt1.
    
    Authored-by: Bruce Robbins <be...@gmail.com>
    Signed-off-by: Herman van Hovell <hv...@databricks.com>
    (cherry picked from commit d4a30fa9af81a8bbb50d75f495ca3787f68f10e4)
    Signed-off-by: Herman van Hovell <hv...@databricks.com>
---
 .../apache/spark/sql/execution/WholeStageCodegenExec.scala   | 12 +++++++-----
 .../apache/spark/sql/execution/WholeStageCodegenSuite.scala  |  9 +++++++++
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index ded8dd3..7b01f61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -142,14 +142,11 @@ trait CodegenSupport extends SparkPlan {
    * Note that `outputVars` and `row` can't both be null.
    */
   final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = {
-    val inputVars =
+    val inputVarsCandidate =
       if (outputVars != null) {
         assert(outputVars.length == output.length)
         // outputVars will be used to generate the code for UnsafeRow, so we should copy them
-        outputVars.map(_.copy()) match {
-          case stream: Stream[ExprCode] => stream.force
-          case other => other
-        }
+        outputVars.map(_.copy())
       } else {
         assert(row != null, "outputVars and row cannot both be null.")
         ctx.currentVars = null
@@ -159,6 +156,11 @@ trait CodegenSupport extends SparkPlan {
         }
       }
 
+    val inputVars = inputVarsCandidate match {
+      case stream: Stream[ExprCode] => stream.force
+      case other => other
+    }
+
     val rowVar = prepareRowVar(ctx, row, outputVars)
 
     // Set up the `currentVars` in the codegen context, as we generate the code of `inputVars`
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 09ad0fd..e03f084 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -330,4 +330,13 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
 
     checkAnswer(abc, Row(1, "a"))
   }
+
+  test("SPARK-26680: Stream in groupBy does not cause StackOverflowError") {
+    val groupByCols = Stream(col("key"))
+    val df = Seq((1, 2), (2, 3), (1, 3)).toDF("key", "value")
+      .groupBy(groupByCols: _*)
+      .max("value")
+
+    checkAnswer(df, Seq(Row(1, 3), Row(2, 3)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org