You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/07 06:34:49 UTC
spark git commit: [SPARK-12678][CORE] MapPartitionsRDD
clearDependencies
Repository: spark
Updated Branches:
refs/heads/master 174e72cec -> b67385203
[SPARK-12678][CORE] MapPartitionsRDD clearDependencies
MapPartitionsRDD was keeping a reference to `prev` after a call to
`clearDependencies` which could lead to memory leak.
Author: Guillaume Poulin <po...@gmail.com>
Closes #10623 from gpoulin/map_partition_deps.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6738520
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6738520
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6738520
Branch: refs/heads/master
Commit: b6738520374637347ab5ae6c801730cdb6b35daa
Parents: 174e72c
Author: Guillaume Poulin <po...@gmail.com>
Authored: Wed Jan 6 21:34:46 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jan 6 21:34:46 2016 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b6738520/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index 4312d3a..e4587c9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -25,7 +25,7 @@ import org.apache.spark.{Partition, TaskContext}
* An RDD that applies the provided function to every partition of the parent RDD.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
- prev: RDD[T],
+ var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
@@ -36,4 +36,9 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ prev = null
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org