You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/20 16:00:46 UTC

spark git commit: [SPARK-22076][SQL] Expand.projections should not be a Stream

Repository: spark
Updated Branches:
  refs/heads/master e17901d6d -> ce6a71e01


[SPARK-22076][SQL] Expand.projections should not be a Stream

## What changes were proposed in this pull request?

Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```

It can be traced back to https://github.com/apache/spark/pull/15484 , which made `Expand.projections` a lazy `Stream` for group by cube.

In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts.

This change is also good for master branch, to reduce the serialized size of `Expand.projections`.

## How was this patch tested?

manually verified with Spark with Scala 2.10.

Author: Wenchen Fan <we...@databricks.com>

Closes #19289 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce6a71e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce6a71e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce6a71e0

Branch: refs/heads/master
Commit: ce6a71e013c403d0a3690cf823934530ce0ea5ef
Parents: e17901d
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Sep 20 09:00:43 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Sep 20 09:00:43 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce6a71e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index db276fb..4535176 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -279,9 +279,15 @@ class Analyzer(
      *  We need to get all of its subsets for a given GROUPBY expression, the subsets are
      *  represented as sequence of expressions.
      */
-    def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList match {
+    def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = {
+      // `cubeExprs0` is recursive and returns a lazy Stream. Here we call `toIndexedSeq` to
+      // materialize it and avoid serialization problems later on.
+      cubeExprs0(exprs).toIndexedSeq
+    }
+
+    def cubeExprs0(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList match {
       case x :: xs =>
-        val initial = cubeExprs(xs)
+        val initial = cubeExprs0(xs)
         initial.map(x +: _) ++ initial
       case Nil =>
         Seq(Seq.empty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org