You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/10/20 01:16:34 UTC
spark git commit: [SPARK-11051][CORE] Do not allow local
checkpointing after the RDD is materialized and checkpointed
Repository: spark
Updated Branches:
refs/heads/master 7ab0ce650 -> a1413b366
[SPARK-11051][CORE] Do not allow local checkpointing after the RDD is materialized and checkpointed
JIRA: https://issues.apache.org/jira/browse/SPARK-11051
When a `RDD` is materialized and checkpointed, its partitions and dependencies are cleared. If we allow local checkpointing on it and assign `LocalRDDCheckpointData` to its `checkpointData`. Next time when the RDD is materialized again, the error will be thrown.
Author: Liang-Chi Hsieh <vi...@appier.com>
Closes #9072 from viirya/no-localcheckpoint-after-checkpoint.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1413b36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1413b36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1413b36
Branch: refs/heads/master
Commit: a1413b3662250dd5e980e8b1f7c3dc4585ab4766
Parents: 7ab0ce6
Author: Liang-Chi Hsieh <vi...@appier.com>
Authored: Mon Oct 19 16:16:31 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Oct 19 16:16:31 2015 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/rdd/RDD.scala | 35 ++++++++++++++++----
.../org/apache/spark/CheckpointSuite.scala | 4 +++
2 files changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a1413b36/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a56e542..a97bb17 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -294,7 +294,11 @@ abstract class RDD[T: ClassTag](
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
- if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
+ if (isCheckpointedAndMaterialized) {
+ firstParent[T].iterator(split, context)
+ } else {
+ compute(split, context)
+ }
}
/**
@@ -1520,21 +1524,38 @@ abstract class RDD[T: ClassTag](
persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true)
}
- checkpointData match {
- case Some(reliable: ReliableRDDCheckpointData[_]) => logWarning(
- "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
- case _ =>
+ // If this RDD is already checkpointed and materialized, its lineage is already truncated.
+ // We must not override our `checkpointData` in this case because it is needed to recover
+ // the checkpointed data. If it is overridden, next time materializing on this RDD will
+ // cause error.
+ if (isCheckpointedAndMaterialized) {
+ logWarning("Not marking RDD for local checkpoint because it was already " +
+ "checkpointed and materialized")
+ } else {
+ // Lineage is not truncated yet, so just override any existing checkpoint data with ours
+ checkpointData match {
+ case Some(_: ReliableRDDCheckpointData[_]) => logWarning(
+ "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
+ case _ =>
+ }
+ checkpointData = Some(new LocalRDDCheckpointData(this))
}
- checkpointData = Some(new LocalRDDCheckpointData(this))
this
}
/**
- * Return whether this RDD is marked for checkpointing, either reliably or locally.
+ * Return whether this RDD is checkpointed and materialized, either reliably or locally.
*/
def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
/**
+ * Return whether this RDD is checkpointed and materialized, either reliably or locally.
+ * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
+ * return value. Exposed for testing.
+ */
+ private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
+
+ /**
* Return whether this RDD is marked for local checkpointing.
* Exposed for testing.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/a1413b36/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 4d70bfe..119e5fc 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -241,9 +241,13 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging
val rdd = new BlockRDD[Int](sc, Array[BlockId]())
assert(rdd.partitions.size === 0)
assert(rdd.isCheckpointed === false)
+ assert(rdd.isCheckpointedAndMaterialized === false)
checkpoint(rdd, reliableCheckpoint)
+ assert(rdd.isCheckpointed === false)
+ assert(rdd.isCheckpointedAndMaterialized === false)
assert(rdd.count() === 0)
assert(rdd.isCheckpointed === true)
+ assert(rdd.isCheckpointedAndMaterialized === true)
assert(rdd.partitions.size === 0)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org