You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/25 10:10:08 UTC
git commit: [SPARK-2529] Clean closures in foreach and
foreachPartition.
Repository: spark
Updated Branches:
refs/heads/master 8529ced35 -> eb82abd8e
[SPARK-2529] Clean closures in foreach and foreachPartition.
Author: Reynold Xin <rx...@apache.org>
Closes #1583 from rxin/closureClean and squashes the following commits:
8982fe6 [Reynold Xin] [SPARK-2529] Clean closures in foreach and foreachPartition.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb82abd8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb82abd8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb82abd8
Branch: refs/heads/master
Commit: eb82abd8e3d25c912fa75201cf4f429aab8d73c7
Parents: 8529ced
Author: Reynold Xin <rx...@apache.org>
Authored: Fri Jul 25 01:10:05 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Jul 25 01:10:05 2014 -0700
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/eb82abd8/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index edbf7ea..b1c965a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -754,14 +754,16 @@ abstract class RDD[T: ClassTag](
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit) {
- sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
+ val cleanF = sc.clean(f)
+ sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
/**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: Iterator[T] => Unit) {
- sc.runJob(this, (iter: Iterator[T]) => f(iter))
+ val cleanF = sc.clean(f)
+ sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
/**