You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/24 09:09:40 UTC
git commit: [SPARK-2661][bagel]unpersist old processed rdd
Repository: spark
Updated Branches:
refs/heads/master e34922a22 -> 42dfab7d3
[SPARK-2661][bagel]unpersist old processed rdd
Unpersist useless rdd during bagel iteration to make full use of memory.
Author: Daoyuan <da...@intel.com>
Closes #1519 from adrian-wang/bagelunpersist and squashes the following commits:
182c9dd [Daoyuan] rename var nextUseless to lastRDD
87fd3a4 [Daoyuan] bagel unpersist old processed rdd
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42dfab7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42dfab7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42dfab7d
Branch: refs/heads/master
Commit: 42dfab7d374cf64a39b692ebc089792a4ff7e42c
Parents: e34922a
Author: Daoyuan <da...@intel.com>
Authored: Thu Jul 24 00:09:36 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Jul 24 00:09:36 2014 -0700
----------------------------------------------------------------------
bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/42dfab7d/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
----------------------------------------------------------------------
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 70a99b3..ef0bb2a 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -72,6 +72,7 @@ object Bagel extends Logging {
var verts = vertices
var msgs = messages
var noActivity = false
+ var lastRDD: RDD[(K, (V, Array[M]))] = null
do {
logInfo("Starting superstep " + superstep + ".")
val startTime = System.currentTimeMillis
@@ -83,6 +84,10 @@ object Bagel extends Logging {
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
val (processed, numMsgs, numActiveVerts) =
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
+ if (lastRDD != null) {
+ lastRDD.unpersist(false)
+ }
+ lastRDD = processed
val timeTaken = System.currentTimeMillis - startTime
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))