You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2019/01/24 10:19:27 UTC
[spark] branch branch-2.4 updated: [SPARK-26680][SQL] Eagerly
create inputVars while conditions are appropriate
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new e8e9b11 [SPARK-26680][SQL] Eagerly create inputVars while conditions are appropriate
e8e9b11 is described below
commit e8e9b110c7076139f551797ef7b74c7953b66f24
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Thu Jan 24 11:18:08 2019 +0100
[SPARK-26680][SQL] Eagerly create inputVars while conditions are appropriate
## What changes were proposed in this pull request?
When a user passes a Stream to groupBy, ```CodegenSupport.consume``` ends up lazily generating ```inputVars``` from a Stream, since the field ```output``` will be a Stream. At the time ```output.zipWithIndex.map``` is called, conditions are correct. However, by the time the map operation actually executes, conditions are no longer appropriate. The closure used by the map operation ends up using a reference to the partially created ```inputVars```. As a result, a StackOverflowError occurs.
This PR ensures that ```inputVars``` is eagerly created while conditions are appropriate. It seems this was also an issue with the code path for creating ```inputVars``` from ```outputVars``` (SPARK-25767). I simply extended the solution for that code path to encompass both code paths.
## How was this patch tested?
SQL unit tests
new test
python tests
Closes #23617 from bersprockets/SPARK-26680_opt1.
Authored-by: Bruce Robbins <be...@gmail.com>
Signed-off-by: Herman van Hovell <hv...@databricks.com>
(cherry picked from commit d4a30fa9af81a8bbb50d75f495ca3787f68f10e4)
Signed-off-by: Herman van Hovell <hv...@databricks.com>
---
.../apache/spark/sql/execution/WholeStageCodegenExec.scala | 12 +++++++-----
.../apache/spark/sql/execution/WholeStageCodegenSuite.scala | 9 +++++++++
2 files changed, 16 insertions(+), 5 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index ded8dd3..7b01f61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -142,14 +142,11 @@ trait CodegenSupport extends SparkPlan {
* Note that `outputVars` and `row` can't both be null.
*/
final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = {
- val inputVars =
+ val inputVarsCandidate =
if (outputVars != null) {
assert(outputVars.length == output.length)
// outputVars will be used to generate the code for UnsafeRow, so we should copy them
- outputVars.map(_.copy()) match {
- case stream: Stream[ExprCode] => stream.force
- case other => other
- }
+ outputVars.map(_.copy())
} else {
assert(row != null, "outputVars and row cannot both be null.")
ctx.currentVars = null
@@ -159,6 +156,11 @@ trait CodegenSupport extends SparkPlan {
}
}
+ val inputVars = inputVarsCandidate match {
+ case stream: Stream[ExprCode] => stream.force
+ case other => other
+ }
+
val rowVar = prepareRowVar(ctx, row, outputVars)
// Set up the `currentVars` in the codegen context, as we generate the code of `inputVars`
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 09ad0fd..e03f084 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -330,4 +330,13 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
checkAnswer(abc, Row(1, "a"))
}
+
+ test("SPARK-26680: Stream in groupBy does not cause StackOverflowError") {
+ val groupByCols = Stream(col("key"))
+ val df = Seq((1, 2), (2, 3), (1, 3)).toDF("key", "value")
+ .groupBy(groupByCols: _*)
+ .max("value")
+
+ checkAnswer(df, Seq(Row(1, 3), Row(2, 3)))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org