You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bruce Robbins (JIRA)" <ji...@apache.org> on 2019/01/22 04:12:00 UTC
[jira] [Updated] (SPARK-26680) StackOverflowError if Stream passed
to groupBy
[ https://issues.apache.org/jira/browse/SPARK-26680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruce Robbins updated SPARK-26680:
----------------------------------
Description:
This Java code results in a StackOverflowError:
{code:java}
List<Column> groupByCols = new ArrayList<>();
groupByCols.add(new Column("id1"));
scala.collection.Seq<Column> groupByColsSeq =
JavaConverters.asScalaIteratorConverter(groupByCols.iterator())
.asScala().toSeq();
df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show();
{code}
The {{toSeq}} method above produces a Stream. Passing a Stream to groupBy results in the StackOverflowError. In fact, the error can be produced more easily in spark-shell:
{noformat}
scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv")
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
scala> val groupBySeq = Stream(col("id1"))
groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = Stream(id1, ?)
scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect
java.lang.StackOverflowError
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
at scala.collection.immutable.Stream.drop(Stream.scala:797)
at scala.collection.immutable.Stream.drop(Stream.scala:204)
at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
at scala.collection.immutable.Stream.apply(Stream.scala:204)
at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
at scala.collection.immutable.Stream.drop(Stream.scala:797)
at scala.collection.immutable.Stream.drop(Stream.scala:204)
at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
at scala.collection.immutable.Stream.apply(Stream.scala:204)
at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
...etc...
{noformat}
This is due to the lazy nature of Streams. The method {{consume}} in {{CodegenSupport}} assumes that a map function will be eagerly evaluated:
{code:java}
val inputVars =
ctx.currentVars = null <== the closure cares about this
ctx.INPUT_ROW = row
output.zipWithIndex.map { case (attr, i) =>
BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
-
-
-
ctx.currentVars = inputVars
ctx.INPUT_ROW = null
ctx.freshNamePrefix = parent.variablePrefix
val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)
{code}
The closure passed to the map function assumes {{ctx.currentVars}} will be set to null. But due to lazy evaluation, {{ctx.currentVars}} is set to something else by the time the closure is actually called. Worse yet, {{ctx.currentVars}} is set to the yet-to-be evaluated inputVars stream. The closure uses {{ctx.currentVars}} (via the call {{genCode(ctx)}}), therefore it ends up using the data structure it is attempting to create.
You can recreate the problem is a vanilla Scala shell:
{code:java}
scala> var p1: Seq[Any] = null
p1: Seq[Any] = null
scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) p1(i) else x }
s: scala.collection.immutable.Stream[Any] = Stream(1, ?)
scala> p1 = s
p1: Seq[Any] = Stream(1, ?)
scala> s.foreach(println)
1
java.lang.StackOverflowError
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
... etc ...
{code}
Possible fixes:
- In {{DataSet.groupBy}}, we could ensure the passed Seq is a List before passing it to RelationalGroupedDataset (simply by changing {{cols.map(_.expr)}} to {{cols.toList.map(_.expr)}}
- In {{CodegenSupport.consume}}, we could ensure that the map function is eagerly evaluated (simply by moving the existing match statement to handle the result from either path of the if statement).
- Something else that hasn't occurred to me (opinions welcome).
was:
This Java code results in a StackOverflowError:
{code:java}
List<Column> groupByCols = new ArrayList<>();
groupByCols.add(new Column("id1"));
scala.collection.Seq<Column> groupByColsSeq =
JavaConverters.asScalaIteratorConverter(groupByCols.iterator())
.asScala().toSeq();
df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show();
{code}
The {{toSeq}} method above produces a Stream. Passing a Stream to groupBy results in the StackOverflowError. In fact, the error can be produced more easily in spark-shell:
{noformat}
scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv")
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
scala> val groupBySeq = Stream(col("id1"))
groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = Stream(id1, ?)
scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect
java.lang.StackOverflowError
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
at scala.collection.immutable.Stream.drop(Stream.scala:797)
at scala.collection.immutable.Stream.drop(Stream.scala:204)
at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
at scala.collection.immutable.Stream.apply(Stream.scala:204)
at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
at scala.collection.immutable.Stream.drop(Stream.scala:797)
at scala.collection.immutable.Stream.drop(Stream.scala:204)
at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
at scala.collection.immutable.Stream.apply(Stream.scala:204)
at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
at scala.Option.getOrElse(Option.scala:138)
at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
...etc...
{noformat}
This is due to the lazy nature of Streams. The method {{consume}} in {{CodegenSupport}} assumes that a map function will be eagerly evaluated:
{code:java}
val inputVars =
ctx.currentVars = null <== the closure cares about this
ctx.INPUT_ROW = row
output.zipWithIndex.map { case (attr, i) =>
BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
-
-
-
ctx.currentVars = inputVars
ctx.INPUT_ROW = null
ctx.freshNamePrefix = parent.variablePrefix
val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)
{code}
The closure passed to the map function assumes {{ctx.currentVars}} will be set to null. But due to lazy evaluation, {{ctx.currentVars}} is set to something else by the time the closure is actually called. Worse yet, {{ctx.currentVars}} is set to the yet-to-be evaluated inputVars stream. The closure uses {{ctx.currentVars}} (via the call {{genCode(ctx)}}), therefore it ends up using the data structure it is attempting to create.
You can recreate the problem is a vanilla Scala shell:
{code:java}
scala> var p1: Seq[Any] = null
p1: Seq[Any] = null
scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) p1(i) else x }
s: scala.collection.immutable.Stream[Any] = Stream(1, ?)
scala> p1 = s
p1: Seq[Any] = Stream(1, ?)
scala> s.foreach(println)
1
java.lang.StackOverflowError
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
... etc ...
{code}
Possible fixes:
- In {{DataSet.groupBy}}, we could ensure the passed Seq is a List before passing it to RelationalGroupedDataset (simply by changing {{cols.map(_.expr)}} to {{cols.toList.map(\_.expr)}}
- In {{CodegenSupport}}, we could ensure that the map function is eagerly evaluated (simply by adding ".toList" to the construct).
- Something else that hasn't occurred to me (opinions welcome).
> StackOverflowError if Stream passed to groupBy
> ----------------------------------------------
>
> Key: SPARK-26680
> URL: https://issues.apache.org/jira/browse/SPARK-26680
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Bruce Robbins
> Priority: Major
>
> This Java code results in a StackOverflowError:
> {code:java}
> List<Column> groupByCols = new ArrayList<>();
> groupByCols.add(new Column("id1"));
> scala.collection.Seq<Column> groupByColsSeq =
> JavaConverters.asScalaIteratorConverter(groupByCols.iterator())
> .asScala().toSeq();
> df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show();
> {code}
> The {{toSeq}} method above produces a Stream. Passing a Stream to groupBy results in the StackOverflowError. In fact, the error can be produced more easily in spark-shell:
> {noformat}
> scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv")
> df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
> scala> val groupBySeq = Stream(col("id1"))
> groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = Stream(id1, ?)
> scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect
> java.lang.StackOverflowError
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
> at scala.collection.immutable.Stream.drop(Stream.scala:797)
> at scala.collection.immutable.Stream.drop(Stream.scala:204)
> at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
> at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
> at scala.collection.immutable.Stream.apply(Stream.scala:204)
> at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
> at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
> at scala.Option.getOrElse(Option.scala:138)
> at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
> at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
> at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
> at scala.collection.immutable.Stream.drop(Stream.scala:797)
> at scala.collection.immutable.Stream.drop(Stream.scala:204)
> at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
> at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
> at scala.collection.immutable.Stream.apply(Stream.scala:204)
> at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
> at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
> at scala.Option.getOrElse(Option.scala:138)
> at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
> at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
> ...etc...
> {noformat}
> This is due to the lazy nature of Streams. The method {{consume}} in {{CodegenSupport}} assumes that a map function will be eagerly evaluated:
> {code:java}
> val inputVars =
> ctx.currentVars = null <== the closure cares about this
> ctx.INPUT_ROW = row
> output.zipWithIndex.map { case (attr, i) =>
> BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
> -
> -
> -
> ctx.currentVars = inputVars
> ctx.INPUT_ROW = null
> ctx.freshNamePrefix = parent.variablePrefix
> val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)
> {code}
> The closure passed to the map function assumes {{ctx.currentVars}} will be set to null. But due to lazy evaluation, {{ctx.currentVars}} is set to something else by the time the closure is actually called. Worse yet, {{ctx.currentVars}} is set to the yet-to-be evaluated inputVars stream. The closure uses {{ctx.currentVars}} (via the call {{genCode(ctx)}}), therefore it ends up using the data structure it is attempting to create.
> You can recreate the problem is a vanilla Scala shell:
> {code:java}
> scala> var p1: Seq[Any] = null
> p1: Seq[Any] = null
> scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) p1(i) else x }
> s: scala.collection.immutable.Stream[Any] = Stream(1, ?)
> scala> p1 = s
> p1: Seq[Any] = Stream(1, ?)
> scala> s.foreach(println)
> 1
> java.lang.StackOverflowError
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
> at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
> ... etc ...
> {code}
> Possible fixes:
> - In {{DataSet.groupBy}}, we could ensure the passed Seq is a List before passing it to RelationalGroupedDataset (simply by changing {{cols.map(_.expr)}} to {{cols.toList.map(_.expr)}}
> - In {{CodegenSupport.consume}}, we could ensure that the map function is eagerly evaluated (simply by moving the existing match statement to handle the result from either path of the if statement).
> - Something else that hasn't occurred to me (opinions welcome).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org