You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/19 10:37:14 UTC
spark git commit: [SPARK-6963][CORE]Flaky test:
o.a.s.ContextCleanerSuite automatically cleanup checkpoint
Repository: spark
Updated Branches:
refs/heads/master 8fbd45c74 -> 0424da68d
[SPARK-6963][CORE]Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint
cc andrewor14
Author: GuoQiang Li <wi...@qq.com>
Closes #5548 from witgo/SPARK-6963 and squashes the following commits:
964aea7 [GuoQiang Li] review commits
b08b3c9 [GuoQiang Li] Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0424da68
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0424da68
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0424da68
Branch: refs/heads/master
Commit: 0424da68d4c81dc3a9944d8485feb1233c6633c4
Parents: 8fbd45c
Author: GuoQiang Li <wi...@qq.com>
Authored: Sun Apr 19 09:37:09 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Apr 19 09:37:09 2015 +0100
----------------------------------------------------------------------
.../scala/org/apache/spark/ContextCleaner.scala | 2 ++
.../org/apache/spark/ContextCleanerSuite.scala | 21 ++++++++++++++------
2 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0424da68/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 715b259..37198d8 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -236,6 +236,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
+ listeners.foreach(_.checkpointCleaned(rddId))
logInfo("Cleaned rdd checkpoint data " + rddId)
}
catch {
@@ -260,4 +261,5 @@ private[spark] trait CleanerListener {
def shuffleCleaned(shuffleId: Int)
def broadcastCleaned(broadcastId: Long)
def accumCleaned(accId: Long)
+ def checkpointCleaned(rddId: Long)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0424da68/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 097e707..c7868dd 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -224,7 +224,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
assert(fs.exists(path))
// the checkpoint is not cleaned by default (without the configuration set)
- var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
+ var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil)
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
@@ -245,7 +245,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))
// Test that GC causes checkpoint data cleanup after dereferencing the RDD
- postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
+ postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
@@ -406,12 +406,14 @@ class CleanerTester(
sc: SparkContext,
rddIds: Seq[Int] = Seq.empty,
shuffleIds: Seq[Int] = Seq.empty,
- broadcastIds: Seq[Long] = Seq.empty)
+ broadcastIds: Seq[Long] = Seq.empty,
+ checkpointIds: Seq[Long] = Seq.empty)
extends Logging {
val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
+ val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal
val cleanerListener = new CleanerListener {
@@ -427,12 +429,17 @@ class CleanerTester(
def broadcastCleaned(broadcastId: Long): Unit = {
toBeCleanedBroadcstIds -= broadcastId
- logInfo("Broadcast" + broadcastId + " cleaned")
+ logInfo("Broadcast " + broadcastId + " cleaned")
}
def accumCleaned(accId: Long): Unit = {
logInfo("Cleaned accId " + accId + " cleaned")
}
+
+ def checkpointCleaned(rddId: Long): Unit = {
+ toBeCheckpointIds -= rddId
+ logInfo("checkpoint " + rddId + " cleaned")
+ }
}
val MAX_VALIDATION_ATTEMPTS = 10
@@ -456,7 +463,8 @@ class CleanerTester(
/** Verify that RDDs, shuffles, etc. occupy resources */
private def preCleanupValidate() {
- assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, "Nothing to cleanup")
+ assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty ||
+ checkpointIds.nonEmpty, "Nothing to cleanup")
// Verify the RDDs have been persisted and blocks are present
rddIds.foreach { rddId =>
@@ -547,7 +555,8 @@ class CleanerTester(
private def isAllCleanedUp =
toBeCleanedRDDIds.isEmpty &&
toBeCleanedShuffleIds.isEmpty &&
- toBeCleanedBroadcstIds.isEmpty
+ toBeCleanedBroadcstIds.isEmpty &&
+ toBeCheckpointIds.isEmpty
private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org