You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/09/06 09:33:06 UTC
git commit: [SPARK-3409][SQL] Avoid pulling in Exchange operator
itself in Exchange's closures.
Repository: spark
Updated Branches:
refs/heads/master 9422c4ee0 -> 1b9001f78
[SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.
This is a tiny teeny optimization to move the if check of sortBasedShuffledOn to outside the closures so the closures don't need to pull in the entire Exchange operator object.
Author: Reynold Xin <rx...@apache.org>
Closes #2282 from rxin/SPARK-3409 and squashes the following commits:
1de3f88 [Reynold Xin] [SPARK-3409][SQL] Avoid pulling in Exchange operator itself in Exchange's closures.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b9001f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b9001f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b9001f7
Branch: refs/heads/master
Commit: 1b9001f78d96faefff02b846b169c249d9e4d612
Parents: 9422c4e
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Sep 6 00:33:00 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Sep 6 00:33:00 2014 -0700
----------------------------------------------------------------------
.../apache/spark/sql/execution/Exchange.scala | 43 ++++++++++----------
1 file changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1b9001f7/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 4802e40..927f400 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -36,25 +36,23 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
override def outputPartitioning = newPartitioning
- def output = child.output
+ override def output = child.output
/** We must copy rows when sort based shuffle is on */
protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
- def execute() = attachTree(this , "execute") {
+ override def execute() = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
- val rdd = child.execute().mapPartitions { iter =>
- if (sortBasedShuffleOn) {
- @transient val hashExpressions =
- newProjection(expressions, child.output)
-
+ val rdd = if (sortBasedShuffleOn) {
+ child.execute().mapPartitions { iter =>
+ val hashExpressions = newProjection(expressions, child.output)
iter.map(r => (hashExpressions(r), r.copy()))
- } else {
- @transient val hashExpressions =
- newMutableProjection(expressions, child.output)()
-
+ }
+ } else {
+ child.execute().mapPartitions { iter =>
+ val hashExpressions = newMutableProjection(expressions, child.output)()
val mutablePair = new MutablePair[Row, Row]()
iter.map(r => mutablePair.update(hashExpressions(r), r))
}
@@ -65,17 +63,18 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._2)
case RangePartitioning(sortingExpressions, numPartitions) =>
- // TODO: RangePartitioner should take an Ordering.
- implicit val ordering = new RowOrdering(sortingExpressions, child.output)
-
- val rdd = child.execute().mapPartitions { iter =>
- if (sortBasedShuffleOn) {
- iter.map(row => (row.copy(), null))
- } else {
+ val rdd = if (sortBasedShuffleOn) {
+ child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))}
+ } else {
+ child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Row, Null](null, null)
iter.map(row => mutablePair.update(row, null))
}
}
+
+ // TODO: RangePartitioner should take an Ordering.
+ implicit val ordering = new RowOrdering(sortingExpressions, child.output)
+
val part = new RangePartitioner(numPartitions, rdd, ascending = true)
val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part)
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
@@ -83,10 +82,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._1)
case SinglePartition =>
- val rdd = child.execute().mapPartitions { iter =>
- if (sortBasedShuffleOn) {
- iter.map(r => (null, r.copy()))
- } else {
+ val rdd = if (sortBasedShuffleOn) {
+ child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) }
+ } else {
+ child.execute().mapPartitions { iter =>
val mutablePair = new MutablePair[Null, Row]()
iter.map(r => mutablePair.update(null, r))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org