You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/12/22 23:26:38 UTC
spark git commit: [SPARK-4818][Core] Add 'iterator' to reduce memory
consumed by join
Repository: spark
Updated Branches:
refs/heads/master de9d7d2b5 -> c233ab3d8
[SPARK-4818][Core] Add 'iterator' to reduce memory consumed by join
In Scala, `map` and `flatMap` of `Iterable` will copy the contents of `Iterable` to a new `Seq`. Such as,
```Scala
val iterable = Seq(1, 2, 3).map(v => {
println(v)
v
})
println("Iterable map done")
val iterator = Seq(1, 2, 3).iterator.map(v => {
println(v)
v
})
println("Iterator map done")
```
outputed
```
1
2
3
Iterable map done
Iterator map done
```
So we should use 'iterator' to reduce memory consumed by join.
Found by Johannes Simon in http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3C5BE70814-9D03-4F61-AE2C-0D63F2DE4446%40mail.de%3E
Author: zsxwing <zs...@gmail.com>
Closes #3671 from zsxwing/SPARK-4824 and squashes the following commits:
48ee7b9 [zsxwing] Remove the explicit types
95d59d6 [zsxwing] Add 'iterator' to reduce memory consumed by join
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c233ab3d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c233ab3d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c233ab3d
Branch: refs/heads/master
Commit: c233ab3d8d75a33495298964fe73dbf7dd8fe305
Parents: de9d7d2
Author: zsxwing <zs...@gmail.com>
Authored: Mon Dec 22 14:26:28 2014 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Mon Dec 22 14:26:28 2014 -0800
----------------------------------------------------------------------
.../org/apache/spark/rdd/PairRDDFunctions.scala | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c233ab3d/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index fe3129b..4469c89 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -483,7 +483,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues( pair =>
- for (v <- pair._1; w <- pair._2) yield (v, w)
+ for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
@@ -496,9 +496,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
- pair._1.map(v => (v, None))
+ pair._1.iterator.map(v => (v, None))
} else {
- for (v <- pair._1; w <- pair._2) yield (v, Some(w))
+ for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}
@@ -513,9 +513,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
- pair._2.map(w => (None, w))
+ pair._2.iterator.map(w => (None, w))
} else {
- for (v <- pair._1; w <- pair._2) yield (Some(v), w)
+ for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
}
}
}
@@ -531,9 +531,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues {
- case (vs, Seq()) => vs.map(v => (Some(v), None))
- case (Seq(), ws) => ws.map(w => (None, Some(w)))
- case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
+ case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
+ case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
+ case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org