You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/10/20 01:16:34 UTC

spark git commit: [SPARK-11051][CORE] Do not allow local checkpointing after the RDD is materialized and checkpointed

Repository: spark
Updated Branches:
  refs/heads/master 7ab0ce650 -> a1413b366


[SPARK-11051][CORE] Do not allow local checkpointing after the RDD is materialized and checkpointed

JIRA: https://issues.apache.org/jira/browse/SPARK-11051

When a `RDD` is materialized and checkpointed, its partitions and dependencies are cleared. If we allow local checkpointing on it and assign `LocalRDDCheckpointData` to its `checkpointData`. Next time when the RDD is materialized again, the error will be thrown.

Author: Liang-Chi Hsieh <vi...@appier.com>

Closes #9072 from viirya/no-localcheckpoint-after-checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1413b36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1413b36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1413b36

Branch: refs/heads/master
Commit: a1413b3662250dd5e980e8b1f7c3dc4585ab4766
Parents: 7ab0ce6
Author: Liang-Chi Hsieh <vi...@appier.com>
Authored: Mon Oct 19 16:16:31 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Oct 19 16:16:31 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 35 ++++++++++++++++----
 .../org/apache/spark/CheckpointSuite.scala      |  4 +++
 2 files changed, 32 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a1413b36/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a56e542..a97bb17 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -294,7 +294,11 @@ abstract class RDD[T: ClassTag](
    */
   private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
   {
-    if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
+    if (isCheckpointedAndMaterialized) {
+      firstParent[T].iterator(split, context)
+    } else {
+      compute(split, context)
+    }
   }
 
   /**
@@ -1520,21 +1524,38 @@ abstract class RDD[T: ClassTag](
       persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true)
     }
 
-    checkpointData match {
-      case Some(reliable: ReliableRDDCheckpointData[_]) => logWarning(
-        "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
-      case _ =>
+    // If this RDD is already checkpointed and materialized, its lineage is already truncated.
+    // We must not override our `checkpointData` in this case because it is needed to recover
+    // the checkpointed data. If it is overridden, next time materializing on this RDD will
+    // cause error.
+    if (isCheckpointedAndMaterialized) {
+      logWarning("Not marking RDD for local checkpoint because it was already " +
+        "checkpointed and materialized")
+    } else {
+      // Lineage is not truncated yet, so just override any existing checkpoint data with ours
+      checkpointData match {
+        case Some(_: ReliableRDDCheckpointData[_]) => logWarning(
+          "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
+        case _ =>
+      }
+      checkpointData = Some(new LocalRDDCheckpointData(this))
     }
-    checkpointData = Some(new LocalRDDCheckpointData(this))
     this
   }
 
   /**
-   * Return whether this RDD is marked for checkpointing, either reliably or locally.
+   * Return whether this RDD is checkpointed and materialized, either reliably or locally.
    */
   def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
 
   /**
+   * Return whether this RDD is checkpointed and materialized, either reliably or locally.
+   * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
+   * return value. Exposed for testing.
+   */
+  private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
+
+  /**
    * Return whether this RDD is marked for local checkpointing.
    * Exposed for testing.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/a1413b36/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 4d70bfe..119e5fc 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -241,9 +241,13 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging
     val rdd = new BlockRDD[Int](sc, Array[BlockId]())
     assert(rdd.partitions.size === 0)
     assert(rdd.isCheckpointed === false)
+    assert(rdd.isCheckpointedAndMaterialized === false)
     checkpoint(rdd, reliableCheckpoint)
+    assert(rdd.isCheckpointed === false)
+    assert(rdd.isCheckpointedAndMaterialized === false)
     assert(rdd.count() === 0)
     assert(rdd.isCheckpointed === true)
+    assert(rdd.isCheckpointedAndMaterialized === true)
     assert(rdd.partitions.size === 0)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org