You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/24 09:09:40 UTC

git commit: [SPARK-2661][bagel]unpersist old processed rdd

Repository: spark
Updated Branches:
  refs/heads/master e34922a22 -> 42dfab7d3


[SPARK-2661][bagel]unpersist old processed rdd

Unpersist useless rdd during bagel iteration to make full use of memory.

Author: Daoyuan <da...@intel.com>

Closes #1519 from adrian-wang/bagelunpersist and squashes the following commits:

182c9dd [Daoyuan] rename var nextUseless to lastRDD
87fd3a4 [Daoyuan] bagel unpersist old processed rdd


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42dfab7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42dfab7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42dfab7d

Branch: refs/heads/master
Commit: 42dfab7d374cf64a39b692ebc089792a4ff7e42c
Parents: e34922a
Author: Daoyuan <da...@intel.com>
Authored: Thu Jul 24 00:09:36 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Jul 24 00:09:36 2014 -0700

----------------------------------------------------------------------
 bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/42dfab7d/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
----------------------------------------------------------------------
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 70a99b3..ef0bb2a 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -72,6 +72,7 @@ object Bagel extends Logging {
     var verts = vertices
     var msgs = messages
     var noActivity = false
+    var lastRDD: RDD[(K, (V, Array[M]))] = null
     do {
       logInfo("Starting superstep " + superstep + ".")
       val startTime = System.currentTimeMillis
@@ -83,6 +84,10 @@ object Bagel extends Logging {
       val superstep_ = superstep  // Create a read-only copy of superstep for capture in closure
       val (processed, numMsgs, numActiveVerts) =
         comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
+      if (lastRDD != null) {
+        lastRDD.unpersist(false)
+      }
+      lastRDD = processed
 
       val timeTaken = System.currentTimeMillis - startTime
       logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))