You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/20 16:00:46 UTC
spark git commit: [SPARK-22076][SQL] Expand.projections should not be
a Stream
Repository: spark
Updated Branches:
refs/heads/master e17901d6d -> ce6a71e01
[SPARK-22076][SQL] Expand.projections should not be a Stream
## What changes were proposed in this pull request?
Spark with Scala 2.10 fails with a group by cube:
```
spark.range(1).select($"id" as "a", $"id" as "b").write.partitionBy("a").mode("overwrite").saveAsTable("rollup_bug")
spark.sql("select 1 from rollup_bug group by rollup ()").show
```
It can be traced back to https://github.com/apache/spark/pull/15484 , which made `Expand.projections` a lazy `Stream` for group by cube.
In scala 2.10 `Stream` captures a lot of stuff, and in this case it captures the entire query plan which has some un-serializable parts.
This change is also good for master branch, to reduce the serialized size of `Expand.projections`.
## How was this patch tested?
manually verified with Spark with Scala 2.10.
Author: Wenchen Fan <we...@databricks.com>
Closes #19289 from cloud-fan/bug.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce6a71e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce6a71e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce6a71e0
Branch: refs/heads/master
Commit: ce6a71e013c403d0a3690cf823934530ce0ea5ef
Parents: e17901d
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Sep 20 09:00:43 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Sep 20 09:00:43 2017 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ce6a71e0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index db276fb..4535176 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -279,9 +279,15 @@ class Analyzer(
* We need to get all of its subsets for a given GROUPBY expression, the subsets are
* represented as sequence of expressions.
*/
- def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList match {
+ def cubeExprs(exprs: Seq[Expression]): Seq[Seq[Expression]] = {
+ // `cubeExprs0` is recursive and returns a lazy Stream. Here we call `toIndexedSeq` to
+ // materialize it and avoid serialization problems later on.
+ cubeExprs0(exprs).toIndexedSeq
+ }
+
+ def cubeExprs0(exprs: Seq[Expression]): Seq[Seq[Expression]] = exprs.toList match {
case x :: xs =>
- val initial = cubeExprs(xs)
+ val initial = cubeExprs0(xs)
initial.map(x +: _) ++ initial
case Nil =>
Seq(Seq.empty)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org