You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/15 00:03:30 UTC

spark git commit: [SQL] Don't shuffle code generated rows

Repository: spark
Updated Branches:
  refs/heads/master f805025e8 -> 4b4b50c9e


[SQL] Don't shuffle code generated rows

When sort based shuffle and code gen are on we were trying to ship the code generated rows during a shuffle.  This doesn't work because the classes don't exist on the other side.  Instead we now copy into a generic row before shipping.

Author: Michael Armbrust <mi...@databricks.com>

Closes #3263 from marmbrus/aggCodeGen and squashes the following commits:

f6ba8cf [Michael Armbrust] fix and test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b4b50c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b4b50c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b4b50c9

Branch: refs/heads/master
Commit: 4b4b50c9e596673c1534df97effad50d107a8007
Parents: f805025
Author: Michael Armbrust <mi...@databricks.com>
Authored: Fri Nov 14 15:03:23 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Nov 14 15:03:23 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/execution/Exchange.scala  | 4 ++--
 .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala   | 7 +++++++
 2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4b4b50c9/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 927f400..cff7a01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -47,8 +47,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
         // TODO: Eliminate redundant expressions in grouping key and value.
         val rdd = if (sortBasedShuffleOn) {
           child.execute().mapPartitions { iter =>
-            val hashExpressions = newProjection(expressions, child.output)
-            iter.map(r => (hashExpressions(r), r.copy()))
+            val hashExpressions = newMutableProjection(expressions, child.output)()
+            iter.map(r => (hashExpressions(r).copy(), r.copy()))
           }
         } else {
           child.execute().mapPartitions { iter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/4b4b50c9/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 8a80724..5dd777f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -72,6 +72,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       2.5)
   }
 
+  test("aggregation with codegen") {
+    val originalValue = codegenEnabled
+    setConf(SQLConf.CODEGEN_ENABLED, "true")
+    sql("SELECT key FROM testData GROUP BY key").collect()
+    setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+  }
+
   test("SPARK-3176 Added Parser of SQL LAST()") {
     checkAnswer(
       sql("SELECT LAST(n) FROM lowerCaseData"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org