You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/18 08:57:12 UTC
git commit: SPARK-2553. CoGroupedRDD unnecessarily allocates a Tuple2
per dependency...
Repository: spark
Updated Branches:
refs/heads/master 29809a6d5 -> e52b8719c
SPARK-2553. CoGroupedRDD unnecessarily allocates a Tuple2 per dependency...
... per key
My humble opinion is that avoiding allocations in this performance-critical section is worth the extra code.
Author: Sandy Ryza <sa...@cloudera.com>
Closes #1461 from sryza/sandy-spark-2553 and squashes the following commits:
7eaf7f2 [Sandy Ryza] SPARK-2553. CoGroupedRDD unnecessarily allocates a Tuple2 per dependency per key
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e52b8719
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e52b8719
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e52b8719
Branch: refs/heads/master
Commit: e52b8719cf0603e79ded51cbe1c9f88eea8b56de
Parents: 29809a6
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Thu Jul 17 23:57:08 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Jul 17 23:57:08 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e52b8719/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 5951865..b284b63 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -180,7 +180,11 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
}
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
(combiner1, combiner2) => {
- combiner1.zip(combiner2).map { case (v1, v2) => v1 ++ v2 }
+ var depNum = 0
+ while (depNum < numRdds) {
+ combiner1(depNum) ++= combiner2(depNum)
+ depNum += 1
+ }
}
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
createCombiner, mergeValue, mergeCombiners)