You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/15 00:03:30 UTC
spark git commit: [SQL] Don't shuffle code generated rows
Repository: spark
Updated Branches:
refs/heads/master f805025e8 -> 4b4b50c9e
[SQL] Don't shuffle code generated rows
When sort based shuffle and code gen are on we were trying to ship the code generated rows during a shuffle. This doesn't work because the classes don't exist on the other side. Instead we now copy into a generic row before shipping.
Author: Michael Armbrust <mi...@databricks.com>
Closes #3263 from marmbrus/aggCodeGen and squashes the following commits:
f6ba8cf [Michael Armbrust] fix and test
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b4b50c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b4b50c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b4b50c9
Branch: refs/heads/master
Commit: 4b4b50c9e596673c1534df97effad50d107a8007
Parents: f805025
Author: Michael Armbrust <mi...@databricks.com>
Authored: Fri Nov 14 15:03:23 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Nov 14 15:03:23 2014 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/sql/execution/Exchange.scala | 4 ++--
.../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 +++++++
2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4b4b50c9/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 927f400..cff7a01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -47,8 +47,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
// TODO: Eliminate redundant expressions in grouping key and value.
val rdd = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter =>
- val hashExpressions = newProjection(expressions, child.output)
- iter.map(r => (hashExpressions(r), r.copy()))
+ val hashExpressions = newMutableProjection(expressions, child.output)()
+ iter.map(r => (hashExpressions(r).copy(), r.copy()))
}
} else {
child.execute().mapPartitions { iter =>
http://git-wip-us.apache.org/repos/asf/spark/blob/4b4b50c9/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 8a80724..5dd777f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -72,6 +72,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
2.5)
}
+ test("aggregation with codegen") {
+ val originalValue = codegenEnabled
+ setConf(SQLConf.CODEGEN_ENABLED, "true")
+ sql("SELECT key FROM testData GROUP BY key").collect()
+ setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+ }
+
test("SPARK-3176 Added Parser of SQL LAST()") {
checkAnswer(
sql("SELECT LAST(n) FROM lowerCaseData"),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org