You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bruce Robbins (JIRA)" <ji...@apache.org> on 2019/01/22 04:12:00 UTC

[jira] [Updated] (SPARK-26680) StackOverflowError if Stream passed to groupBy

     [ https://issues.apache.org/jira/browse/SPARK-26680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bruce Robbins updated SPARK-26680:
----------------------------------
    Description: 
This Java code results in a StackOverflowError:
{code:java}
List<Column> groupByCols = new ArrayList<>();
groupByCols.add(new Column("id1"));
scala.collection.Seq<Column> groupByColsSeq =
    JavaConverters.asScalaIteratorConverter(groupByCols.iterator())
        .asScala().toSeq();
df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show();
{code}
The {{toSeq}} method above produces a Stream. Passing a Stream to groupBy results in the StackOverflowError. In fact, the error can be produced more easily in spark-shell:
{noformat}
scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv")
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
scala> val groupBySeq = Stream(col("id1"))
groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = Stream(id1, ?)
scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect
java.lang.StackOverflowError
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
  at scala.collection.immutable.Stream.drop(Stream.scala:797)
  at scala.collection.immutable.Stream.drop(Stream.scala:204)
  at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
  at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
  at scala.collection.immutable.Stream.apply(Stream.scala:204)
  at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
  at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
  at scala.Option.getOrElse(Option.scala:138)
  at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
  at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
  at scala.collection.immutable.Stream.drop(Stream.scala:797)
  at scala.collection.immutable.Stream.drop(Stream.scala:204)
  at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
  at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
  at scala.collection.immutable.Stream.apply(Stream.scala:204)
  at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
  at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
  at scala.Option.getOrElse(Option.scala:138)
  at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
  at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
...etc...
{noformat}
This is due to the lazy nature of Streams. The method {{consume}} in {{CodegenSupport}} assumes that a map function will be eagerly evaluated:
{code:java}
val inputVars =
        ctx.currentVars = null <== the closure cares about this
        ctx.INPUT_ROW = row
        output.zipWithIndex.map { case (attr, i) =>
          BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
-
-
-
    ctx.currentVars = inputVars
    ctx.INPUT_ROW = null
    ctx.freshNamePrefix = parent.variablePrefix
    val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)
{code}
The closure passed to the map function assumes {{ctx.currentVars}} will be set to null. But due to lazy evaluation, {{ctx.currentVars}} is set to something else by the time the closure is actually called. Worse yet, {{ctx.currentVars}} is set to the yet-to-be evaluated inputVars stream. The closure uses {{ctx.currentVars}} (via the call {{genCode(ctx)}}), therefore it ends up using the data structure it is attempting to create.

You can recreate the problem is a vanilla Scala shell:
{code:java}
scala> var p1: Seq[Any] = null
p1: Seq[Any] = null
scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) p1(i) else x }
s: scala.collection.immutable.Stream[Any] = Stream(1, ?)
scala> p1 = s
p1: Seq[Any] = Stream(1, ?)
scala> s.foreach(println)
1
java.lang.StackOverflowError
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
... etc ...
{code}
Possible fixes:
 - In {{DataSet.groupBy}}, we could ensure the passed Seq is a List before passing it to RelationalGroupedDataset (simply by changing {{cols.map(_.expr)}} to {{cols.toList.map(_.expr)}}
 - In {{CodegenSupport.consume}}, we could ensure that the map function is eagerly evaluated (simply by moving the existing match statement to handle the result from either path of the if statement).
 - Something else that hasn't occurred to me (opinions welcome).

  was:
This Java code results in a StackOverflowError:
{code:java}
List<Column> groupByCols = new ArrayList<>();
groupByCols.add(new Column("id1"));
scala.collection.Seq<Column> groupByColsSeq =
    JavaConverters.asScalaIteratorConverter(groupByCols.iterator())
        .asScala().toSeq();
df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show();
{code}
The {{toSeq}} method above produces a Stream. Passing a Stream to groupBy results in the StackOverflowError. In fact, the error can be produced more easily in spark-shell:
{noformat}
scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv")
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
scala> val groupBySeq = Stream(col("id1"))
groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = Stream(id1, ?)
scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect
java.lang.StackOverflowError
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
  at scala.collection.immutable.Stream.drop(Stream.scala:797)
  at scala.collection.immutable.Stream.drop(Stream.scala:204)
  at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
  at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
  at scala.collection.immutable.Stream.apply(Stream.scala:204)
  at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
  at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
  at scala.Option.getOrElse(Option.scala:138)
  at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
  at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
  at scala.collection.immutable.Stream.drop(Stream.scala:797)
  at scala.collection.immutable.Stream.drop(Stream.scala:204)
  at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
  at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
  at scala.collection.immutable.Stream.apply(Stream.scala:204)
  at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
  at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
  at scala.Option.getOrElse(Option.scala:138)
  at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
  at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
...etc...
{noformat}
This is due to the lazy nature of Streams. The method {{consume}} in {{CodegenSupport}} assumes that a map function will be eagerly evaluated:
{code:java}
val inputVars =
        ctx.currentVars = null <== the closure cares about this
        ctx.INPUT_ROW = row
        output.zipWithIndex.map { case (attr, i) =>
          BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
-
-
-
    ctx.currentVars = inputVars
    ctx.INPUT_ROW = null
    ctx.freshNamePrefix = parent.variablePrefix
    val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)
{code}
The closure passed to the map function assumes {{ctx.currentVars}} will be set to null. But due to lazy evaluation, {{ctx.currentVars}} is set to something else by the time the closure is actually called. Worse yet, {{ctx.currentVars}} is set to the yet-to-be evaluated inputVars stream. The closure uses {{ctx.currentVars}} (via the call {{genCode(ctx)}}), therefore it ends up using the data structure it is attempting to create.

You can recreate the problem is a vanilla Scala shell:
{code:java}
scala> var p1: Seq[Any] = null
p1: Seq[Any] = null
scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) p1(i) else x }
s: scala.collection.immutable.Stream[Any] = Stream(1, ?)
scala> p1 = s
p1: Seq[Any] = Stream(1, ?)
scala> s.foreach(println)
1
java.lang.StackOverflowError
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
  at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169)
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
... etc ...
{code}
Possible fixes:
 - In {{DataSet.groupBy}}, we could ensure the passed Seq is a List before passing it to RelationalGroupedDataset (simply by changing {{cols.map(_.expr)}} to {{cols.toList.map(\_.expr)}}
 - In {{CodegenSupport}}, we could ensure that the map function is eagerly evaluated (simply by adding ".toList" to the construct).
 - Something else that hasn't occurred to me (opinions welcome).


> StackOverflowError if Stream passed to groupBy
> ----------------------------------------------
>
>                 Key: SPARK-26680
>                 URL: https://issues.apache.org/jira/browse/SPARK-26680
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Bruce Robbins
>            Priority: Major
>
> This Java code results in a StackOverflowError:
> {code:java}
> List<Column> groupByCols = new ArrayList<>();
> groupByCols.add(new Column("id1"));
> scala.collection.Seq<Column> groupByColsSeq =
>     JavaConverters.asScalaIteratorConverter(groupByCols.iterator())
>         .asScala().toSeq();
> df.groupBy(groupByColsSeq).max("id2").toDF("id1", "id2").show();
> {code}
> The {{toSeq}} method above produces a Stream. Passing a Stream to groupBy results in the StackOverflowError. In fact, the error can be produced more easily in spark-shell:
> {noformat}
> scala> val df = spark.read.schema("id1 int, id2 int").csv("testinput.csv")
> df: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
> scala> val groupBySeq = Stream(col("id1"))
> groupBySeq: scala.collection.immutable.Stream[org.apache.spark.sql.Column] = Stream(id1, ?)
> scala> df.groupBy(groupBySeq: _*).max("id2").toDF("id1", "id2").collect
> java.lang.StackOverflowError
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
>   at scala.collection.immutable.Stream.drop(Stream.scala:797)
>   at scala.collection.immutable.Stream.drop(Stream.scala:204)
>   at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
>   at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
>   at scala.collection.immutable.Stream.apply(Stream.scala:204)
>   at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
>   at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
>   at scala.Option.getOrElse(Option.scala:138)
>   at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
>   at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
>   at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:418)
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1171)
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1161)
>   at scala.collection.immutable.Stream.drop(Stream.scala:797)
>   at scala.collection.immutable.Stream.drop(Stream.scala:204)
>   at scala.collection.LinearSeqOptimized.apply(LinearSeqOptimized.scala:66)
>   at scala.collection.LinearSeqOptimized.apply$(LinearSeqOptimized.scala:65)
>   at scala.collection.immutable.Stream.apply(Stream.scala:204)
>   at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:45)
>   at org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:138)
>   at scala.Option.getOrElse(Option.scala:138)
>   at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:133)
>   at org.apache.spark.sql.execution.CodegenSupport.$anonfun$consume$3(WholeStageCodegenExec.scala:159)
> ...etc...
> {noformat}
> This is due to the lazy nature of Streams. The method {{consume}} in {{CodegenSupport}} assumes that a map function will be eagerly evaluated:
> {code:java}
> val inputVars =
>         ctx.currentVars = null <== the closure cares about this
>         ctx.INPUT_ROW = row
>         output.zipWithIndex.map { case (attr, i) =>
>           BoundReference(i, attr.dataType, attr.nullable).genCode(ctx)
> -
> -
> -
>     ctx.currentVars = inputVars
>     ctx.INPUT_ROW = null
>     ctx.freshNamePrefix = parent.variablePrefix
>     val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)
> {code}
> The closure passed to the map function assumes {{ctx.currentVars}} will be set to null. But due to lazy evaluation, {{ctx.currentVars}} is set to something else by the time the closure is actually called. Worse yet, {{ctx.currentVars}} is set to the yet-to-be evaluated inputVars stream. The closure uses {{ctx.currentVars}} (via the call {{genCode(ctx)}}), therefore it ends up using the data structure it is attempting to create.
> You can recreate the problem is a vanilla Scala shell:
> {code:java}
> scala> var p1: Seq[Any] = null
> p1: Seq[Any] = null
> scala> val s = Stream(1, 2).zipWithIndex.map { case (x, i) => if (p1 != null) p1(i) else x }
> s: scala.collection.immutable.Stream[Any] = Stream(1, ?)
> scala> p1 = s
> p1: Seq[Any] = Stream(1, ?)
> scala> s.foreach(println)
> 1
> java.lang.StackOverflowError
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1166)
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
>   at scala.collection.immutable.Stream.$anonfun$map$1(Stream.scala:415)
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1169)
>   at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1159)
> ... etc ...
> {code}
> Possible fixes:
>  - In {{DataSet.groupBy}}, we could ensure the passed Seq is a List before passing it to RelationalGroupedDataset (simply by changing {{cols.map(_.expr)}} to {{cols.toList.map(_.expr)}}
>  - In {{CodegenSupport.consume}}, we could ensure that the map function is eagerly evaluated (simply by moving the existing match statement to handle the result from either path of the if statement).
>  - Something else that hasn't occurred to me (opinions welcome).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org