You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/19 10:37:14 UTC

spark git commit: [SPARK-6963][CORE]Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint

Repository: spark
Updated Branches:
  refs/heads/master 8fbd45c74 -> 0424da68d


[SPARK-6963][CORE]Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint

cc andrewor14

Author: GuoQiang Li <wi...@qq.com>

Closes #5548 from witgo/SPARK-6963 and squashes the following commits:

964aea7 [GuoQiang Li] review commits
b08b3c9 [GuoQiang Li] Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0424da68
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0424da68
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0424da68

Branch: refs/heads/master
Commit: 0424da68d4c81dc3a9944d8485feb1233c6633c4
Parents: 8fbd45c
Author: GuoQiang Li <wi...@qq.com>
Authored: Sun Apr 19 09:37:09 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Apr 19 09:37:09 2015 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/ContextCleaner.scala |  2 ++
 .../org/apache/spark/ContextCleanerSuite.scala  | 21 ++++++++++++++------
 2 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0424da68/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 715b259..37198d8 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -236,6 +236,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
     try {
       logDebug("Cleaning rdd checkpoint data " + rddId)
       RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
+      listeners.foreach(_.checkpointCleaned(rddId))
       logInfo("Cleaned rdd checkpoint data " + rddId)
     }
     catch {
@@ -260,4 +261,5 @@ private[spark] trait CleanerListener {
   def shuffleCleaned(shuffleId: Int)
   def broadcastCleaned(broadcastId: Long)
   def accumCleaned(accId: Long)
+  def checkpointCleaned(rddId: Long)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0424da68/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 097e707..c7868dd 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -224,7 +224,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     assert(fs.exists(path))
 
     // the checkpoint is not cleaned by default (without the configuration set)
-    var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
+    var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil)
     rdd = null // Make RDD out of scope
     runGC()
     postGCTester.assertCleanup()
@@ -245,7 +245,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))
 
     // Test that GC causes checkpoint data cleanup after dereferencing the RDD
-    postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
+    postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
     rdd = null // Make RDD out of scope
     runGC()
     postGCTester.assertCleanup()
@@ -406,12 +406,14 @@ class CleanerTester(
     sc: SparkContext,
     rddIds: Seq[Int] = Seq.empty,
     shuffleIds: Seq[Int] = Seq.empty,
-    broadcastIds: Seq[Long] = Seq.empty)
+    broadcastIds: Seq[Long] = Seq.empty,
+    checkpointIds: Seq[Long] = Seq.empty)
   extends Logging {
 
   val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
   val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
   val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
+  val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
   val isDistributed = !sc.isLocal
 
   val cleanerListener = new CleanerListener {
@@ -427,12 +429,17 @@ class CleanerTester(
 
     def broadcastCleaned(broadcastId: Long): Unit = {
       toBeCleanedBroadcstIds -= broadcastId
-      logInfo("Broadcast" + broadcastId + " cleaned")
+      logInfo("Broadcast " + broadcastId + " cleaned")
     }
 
     def accumCleaned(accId: Long): Unit = {
       logInfo("Cleaned accId " + accId + " cleaned")
     }
+
+    def checkpointCleaned(rddId: Long): Unit = {
+      toBeCheckpointIds -= rddId
+      logInfo("checkpoint  " + rddId + " cleaned")
+    }
   }
 
   val MAX_VALIDATION_ATTEMPTS = 10
@@ -456,7 +463,8 @@ class CleanerTester(
 
   /** Verify that RDDs, shuffles, etc. occupy resources */
   private def preCleanupValidate() {
-    assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, "Nothing to cleanup")
+    assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty ||
+      checkpointIds.nonEmpty, "Nothing to cleanup")
 
     // Verify the RDDs have been persisted and blocks are present
     rddIds.foreach { rddId =>
@@ -547,7 +555,8 @@ class CleanerTester(
   private def isAllCleanedUp =
     toBeCleanedRDDIds.isEmpty &&
     toBeCleanedShuffleIds.isEmpty &&
-    toBeCleanedBroadcstIds.isEmpty
+    toBeCleanedBroadcstIds.isEmpty &&
+    toBeCheckpointIds.isEmpty
 
   private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
     blockManager.master.getMatchingBlockIds( _ match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org