You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/04/12 07:58:38 UTC

spark git commit: [SPARK-14554][SQL] disable whole stage codegen if there are too many input columns

Repository: spark
Updated Branches:
  refs/heads/master 2d81ba542 -> 52a801124


[SPARK-14554][SQL] disable whole stage codegen if there are too many input columns

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/12047/files#diff-94a1f59bcc9b6758c4ca874652437634R529, we may split field expressions codes in `CreateExternalRow` to support wide table. However, the whole stage codegen framework doesn't support it, because the input for expressions is not always the input row, but can be `CodeGenContext.currentVars`, which doesn't work well with `CodeGenContext.splitExpressions`.

Actually we do have a check to guard against this cases, but it's incomplete, it only checks output fields.

This PR improves the whole stage codegen support check, to disable it if there are too many input fields, so that we can avoid splitting field expressions codes in `CreateExternalRow` for whole stage codegen.

TODO: Is it a better solution if we can make `CodeGenContext.currentVars` work well with `CodeGenContext.splitExpressions`?

## How was this patch tested?

new test in DatasetSuite.

Author: Wenchen Fan <we...@databricks.com>

Closes #12322 from cloud-fan/codegen.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52a80112
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52a80112
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52a80112

Branch: refs/heads/master
Commit: 52a801124f429ab133f9a3867c1da6ebd8fa7d4e
Parents: 2d81ba5
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Apr 11 22:58:35 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Apr 11 22:58:35 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/WholeStageCodegen.scala    | 7 +++++--
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala    | 6 ++++++
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/52a80112/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index c4594f0..447dbe7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -446,8 +446,11 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
     case plan: CodegenSupport if plan.supportCodegen =>
       val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
       // the generated code will be huge if there are too many columns
-      val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
-      !willFallback && !haveTooManyFields
+      val hasTooManyOutputFields =
+        numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
+      val hasTooManyInputFields =
+        plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields)
+      !willFallback && !hasTooManyOutputFields && !hasTooManyInputFields
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/52a80112/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index e8e8010..4725168 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -620,6 +620,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
     val df = streaming.join(static, Seq("b"))
     assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
   }
+
+  test("SPARK-14554: Dataset.map may generate wrong java code for wide table") {
+    val wideDF = sqlContext.range(10).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*)
+    // Make sure the generated code for this plan can compile and execute.
+    wideDF.map(_.getLong(0)).collect()
+  }
 }
 
 case class OtherTuple(_1: String, _2: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org