You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 22:21:09 UTC

[05/13] git commit: Added tests for PartitionerAwareUnionRDD in the CheckpointSuite. Refactored CheckpointSuite to make the tests simpler and more reliable. Added missing test for ZippedRDD.

Added tests for PartitionerAwareUnionRDD in the CheckpointSuite. Refactored CheckpointSuite to make the tests simpler and more reliable. Added missing test for ZippedRDD.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/61f4bbda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/61f4bbda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/61f4bbda

Branch: refs/heads/master
Commit: 61f4bbda0d4e3ecbd8b955232a741231936a25de
Parents: de41c43
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Dec 20 00:41:47 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Dec 20 00:41:47 2013 -0800

----------------------------------------------------------------------
 .../spark/rdd/PartitionerAwareUnionRDD.scala    |  38 +-
 .../apache/spark/rdd/RDDCheckpointData.scala    |   2 +-
 .../org/apache/spark/CheckpointSuite.scala      | 361 +++++++++++--------
 3 files changed, 231 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/61f4bbda/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 96cf93f..995042e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -1,16 +1,29 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{TaskContext, OneToOneDependency, SparkContext, Partition}
+import scala.reflect.ClassTag
+import java.io.{ObjectOutputStream, IOException}
 
 private[spark]
-class PartitionerAwareUnionRDDPartition(val idx: Int, val partitions: Array[Partition])
-  extends Partition {
+class PartitionerAwareUnionRDDPartition(
+    @transient val rdds: Seq[RDD[_]],
+    val idx: Int
+  ) extends Partition {
+  var parents = rdds.map(_.partitions(index)).toArray
+
   override val index = idx
   override def hashCode(): Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream) {
+    // Update the reference to parent partition at the time of task serialization
+    parents = rdds.map(_.partitions(index)).toArray
+    oos.defaultWriteObject()
+  }
 }
 
 private[spark]
-class PartitionerAwareUnionRDD[T: ClassManifest](
+class PartitionerAwareUnionRDD[T: ClassTag](
     sc: SparkContext,
     var rdds: Seq[RDD[T]]
   ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
@@ -21,17 +34,16 @@ class PartitionerAwareUnionRDD[T: ClassManifest](
   override val partitioner = rdds.head.partitioner
 
   override def getPartitions: Array[Partition] = {
-    val numPartitions = rdds.head.partitions.length
+    val numPartitions = partitioner.get.numPartitions
     (0 until numPartitions).map(index => {
-      val parentPartitions = rdds.map(_.partitions(index)).toArray
-      new PartitionerAwareUnionRDDPartition(index, parentPartitions)
+      new PartitionerAwareUnionRDDPartition(rdds, index)
     }).toArray
   }
 
   // Get the location where most of the partitions of parent RDDs are located
   override def getPreferredLocations(s: Partition): Seq[String] = {
     logDebug("Getting preferred locations for " + this)
-    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].partitions
+    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
     val locations = rdds.zip(parentPartitions).flatMap {
       case (rdd, part) => {
         val parentLocations = currPrefLocs(rdd, part)
@@ -39,7 +51,6 @@ class PartitionerAwareUnionRDD[T: ClassManifest](
         parentLocations
       }
     }
-
     if (locations.isEmpty) {
       Seq.empty
     } else  {
@@ -48,18 +59,19 @@ class PartitionerAwareUnionRDD[T: ClassManifest](
   }
 
   override def compute(s: Partition, context: TaskContext): Iterator[T] = {
-    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].partitions
+    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
     rdds.zip(parentPartitions).iterator.flatMap {
       case (rdd, p) => rdd.iterator(p, context)
     }
   }
 
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdds = null
+  }
+
   // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
   private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = {
     rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host)
   }
 }
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/61f4bbda/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 3b56e45..fa33a56 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -40,7 +40,7 @@ private[spark] object CheckpointState extends Enumeration {
  * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
  * of the checkpointed RDD.
  */
-private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
+private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
   extends Logging with Serializable {
 
   import CheckpointState._

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/61f4bbda/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index f25d921..81046af 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -57,15 +57,15 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
   }
 
   test("RDDs with one-to-one dependencies") {
-    testCheckpointing(_.map(x => x.toString))
-    testCheckpointing(_.flatMap(x => 1 to x))
-    testCheckpointing(_.filter(_ % 2 == 0))
-    testCheckpointing(_.sample(false, 0.5, 0))
-    testCheckpointing(_.glom())
-    testCheckpointing(_.mapPartitions(_.map(_.toString)))
-    testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
-    testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
-    testCheckpointing(_.pipe(Seq("cat")))
+    testRDD(_.map(x => x.toString))
+    testRDD(_.flatMap(x => 1 to x))
+    testRDD(_.filter(_ % 2 == 0))
+    testRDD(_.sample(false, 0.5, 0))
+    testRDD(_.glom())
+    testRDD(_.mapPartitions(_.map(_.toString)))
+    testRDD(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
+    testRDD(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
+    testRDD(_.pipe(Seq("cat")))
   }
 
   test("ParallelCollection") {
@@ -97,7 +97,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
   }
 
   test("ShuffledRDD") {
-    testCheckpointing(rdd => {
+    testRDD(rdd => {
       // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
       new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
     })
@@ -105,25 +105,17 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("UnionRDD") {
     def otherRDD = sc.makeRDD(1 to 10, 1)
-
-    // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed.
-    // Current implementation of UnionRDD has transient reference to parent RDDs,
-    // so only the partitions will reduce in serialized size, not the RDD.
-    testCheckpointing(_.union(otherRDD), false, true)
-    testParentCheckpointing(_.union(otherRDD), false, true)
+    testRDD(_.union(otherRDD))
+    testRDDPartitions(_.union(otherRDD))
   }
 
   test("CartesianRDD") {
     def otherRDD = sc.makeRDD(1 to 10, 1)
-    testCheckpointing(new CartesianRDD(sc, _, otherRDD))
-
-    // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
-    // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
-    // so only the RDD will reduce in serialized size, not the partitions.
-    testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false)
+    testRDD(new CartesianRDD(sc, _, otherRDD))
+    testRDDPartitions(new CartesianRDD(sc, _, otherRDD))
 
     // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after
-    // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
+    // the parent RDD has been checkpointed and parent partitions have been changed.
     // Note that this test is very specific to the current implementation of CartesianRDD.
     val ones = sc.makeRDD(1 to 100, 10).map(x => x)
     ones.checkpoint() // checkpoint that MappedRDD
@@ -134,23 +126,20 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     val splitAfterCheckpoint =
       serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
     assert(
-      (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) &&
-        (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2),
-      "CartesianRDD.parents not updated after parent RDD checkpointed"
+      (splitAfterCheckpoint.s1.getClass != splitBeforeCheckpoint.s1.getClass) &&
+        (splitAfterCheckpoint.s2.getClass != splitBeforeCheckpoint.s2.getClass),
+      "CartesianRDD.s1 and CartesianRDD.s2 not updated after parent RDD is checkpointed"
     )
   }
 
   test("CoalescedRDD") {
-    testCheckpointing(_.coalesce(2))
+    testRDD(_.coalesce(2))
+    testRDDPartitions(_.coalesce(2))
 
-    // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
-    // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
-    // so only the RDD will reduce in serialized size, not the partitions.
-    testParentCheckpointing(_.coalesce(2), true, false)
-
-    // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after
-    // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
-    // Note that this test is very specific to the current implementation of CoalescedRDDPartitions
+    // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents)
+    // after the parent RDD has been checkpointed and parent partitions have been changed.
+    // Note that this test is very specific to the current implementation of
+    // CoalescedRDDPartitions.
     val ones = sc.makeRDD(1 to 100, 10).map(x => x)
     ones.checkpoint() // checkpoint that MappedRDD
     val coalesced = new CoalescedRDD(ones, 2)
@@ -160,33 +149,78 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     val splitAfterCheckpoint =
       serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
     assert(
-      splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head,
-      "CoalescedRDDPartition.parents not updated after parent RDD checkpointed"
+      splitAfterCheckpoint.parents.head.getClass != splitBeforeCheckpoint.parents.head.getClass,
+      "CoalescedRDDPartition.parents not updated after parent RDD is checkpointed"
     )
   }
 
   test("CoGroupedRDD") {
-    val longLineageRDD1 = generateLongLineageRDDForCoGroupedRDD()
-    testCheckpointing(rdd => {
+    val longLineageRDD1 = generateFatPairRDD()
+    testRDD(rdd => {
       CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
-    }, false, true)
+    })
 
-    val longLineageRDD2 = generateLongLineageRDDForCoGroupedRDD()
-    testParentCheckpointing(rdd => {
+    val longLineageRDD2 = generateFatPairRDD()
+    testRDDPartitions(rdd => {
       CheckpointSuite.cogroup(
         longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
-    }, false, true)
+    })
   }
 
   test("ZippedRDD") {
-    testCheckpointing(
-      rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
-
-    // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed
-    // Current implementation of ZippedRDDPartitions has transient references to parent RDDs,
-    // so only the RDD will reduce in serialized size, not the partitions.
-    testParentCheckpointing(
-      rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
+    testRDD(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)))
+    testRDDPartitions(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)))
+
+    // Test that the ZippedPartition updates parent partitions
+    // after the parent RDD has been checkpointed and parent partitions have been changed.
+    // Note that this test is very specific to the current implementation of ZippedRDD.
+    val rdd = generateFatRDD()
+    val zippedRDD = new ZippedRDD(sc, rdd, rdd.map(x => x))
+    zippedRDD.rdd1.checkpoint()
+    zippedRDD.rdd2.checkpoint()
+    val partitionBeforeCheckpoint =
+      serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]])
+    zippedRDD.count()
+    val partitionAfterCheckpoint =
+      serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]])
+    assert(
+      partitionAfterCheckpoint.partition1.getClass != partitionBeforeCheckpoint.partition1.getClass &&
+        partitionAfterCheckpoint.partition2.getClass != partitionBeforeCheckpoint.partition2.getClass,
+      "ZippedRDD.partition1 and ZippedRDD.partition2 not updated after parent RDD is checkpointed"
+    )
+  }
+
+  test("PartitionerAwareUnionRDD") {
+    testRDD(rdd => {
+      new PartitionerAwareUnionRDD[(Int, Int)](sc, Array(
+        generateFatPairRDD(),
+        rdd.map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
+      ))
+    })
+
+    testRDDPartitions(rdd => {
+      new PartitionerAwareUnionRDD[(Int, Int)](sc, Array(
+        generateFatPairRDD(),
+        rdd.map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
+      ))
+    })
+
+    // Test that the PartitionerAwareUnionRDD updates parent partitions
+    // (PartitionerAwareUnionRDD.parents) after the parent RDD has been checkpointed and parent
+    // partitions have been changed. Note that this test is very specific to the current
+    // implementation of PartitionerAwareUnionRDD.
+    val pairRDD = generateFatPairRDD()
+    pairRDD.checkpoint()
+    val unionRDD = new PartitionerAwareUnionRDD(sc, Array(pairRDD))
+    val partitionBeforeCheckpoint =  serializeDeserialize(
+      unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
+    pairRDD.count()
+    val partitionAfterCheckpoint =  serializeDeserialize(
+      unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
+    assert(
+      partitionBeforeCheckpoint.parents.head.getClass != partitionAfterCheckpoint.parents.head.getClass,
+      "PartitionerAwareUnionRDDPartition.parents not updated after parent RDD is checkpointed"
+    )
   }
 
   test("CheckpointRDD with zero partitions") {
@@ -200,29 +234,32 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
   }
 
   /**
-   * Test checkpointing of the final RDD generated by the given operation. By default,
-   * this method tests whether the size of serialized RDD has reduced after checkpointing or not.
-   * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or
-   * not, but this is not done by default as usually the partitions do not refer to any RDD and
-   * therefore never store the lineage.
+   * Test checkpointing of the RDD generated by the given operation. It tests whether the
+   * serialized size of the RDD is reduce after checkpointing or not. This function should be called
+   * on all RDDs that have a parent RDD (i.e., do not call on ParallelCollection, BlockRDD, etc.).
    */
-  def testCheckpointing[U: ClassTag](
-      op: (RDD[Int]) => RDD[U],
-      testRDDSize: Boolean = true,
-      testRDDPartitionSize: Boolean = false
-    ) {
+  def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
     // Generate the final RDD using given RDD operation
-    val baseRDD = generateLongLineageRDD()
+    val baseRDD = generateFatRDD()
     val operatedRDD = op(baseRDD)
     val parentRDD = operatedRDD.dependencies.headOption.orNull
     val rddType = operatedRDD.getClass.getSimpleName
     val numPartitions = operatedRDD.partitions.length
 
+    // Force initialization of all the data structures in RDDs
+    // Without this, serializing the RDD will give a wrong estimate of the size of the RDD
+    initializeRdd(operatedRDD)
+
+    val partitionsBeforeCheckpoint = operatedRDD.partitions
+
     // Find serialized sizes before and after the checkpoint
-    val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
+    val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
     operatedRDD.checkpoint()
     val result = operatedRDD.collect()
-    val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+    operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
+    val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
 
     // Test whether the checkpoint file has been created
     assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
@@ -230,6 +267,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     // Test whether dependencies have been changed from its earlier parent RDD
     assert(operatedRDD.dependencies.head.rdd != parentRDD)
 
+    // Test whether the partitions have been changed from its earlier partitions
+    assert(operatedRDD.partitions.toList != partitionsBeforeCheckpoint.toList)
+
     // Test whether the partitions have been changed to the new Hadoop partitions
     assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList)
 
@@ -239,122 +279,72 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     // Test whether the data in the checkpointed RDD is same as original
     assert(operatedRDD.collect() === result)
 
-    // Test whether serialized size of the RDD has reduced. If the RDD
-    // does not have any dependency to another RDD (e.g., ParallelCollection,
-    // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing.
-    if (testRDDSize) {
-      logInfo("Size of " + rddType +
-        "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
-      assert(
-        rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
-        "Size of " + rddType + " did not reduce after checkpointing " +
-          "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
-      )
-    }
+    // Test whether serialized size of the RDD has reduced.
+    logInfo("Size of " + rddType +
+      " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
+    assert(
+      rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
+      "Size of " + rddType + " did not reduce after checkpointing " +
+        " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
+    )
 
-    // Test whether serialized size of the partitions has reduced. If the partitions
-    // do not have any non-transient reference to another RDD or another RDD's partitions, it
-    // does not refer to a lineage and therefore may not reduce in size after checkpointing.
-    // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions
-    // must be forgotten after checkpointing (to remove all reference to parent RDDs) and
-    // replaced with the HadooPartitions of the checkpointed RDD.
-    if (testRDDPartitionSize) {
-      logInfo("Size of " + rddType + " partitions "
-        + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]")
-      assert(
-        splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
-        "Size of " + rddType + " partitions did not reduce after checkpointing " +
-          "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
-      )
-    }
   }
 
   /**
    * Test whether checkpointing of the parent of the generated RDD also
    * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent
    * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
-   * this RDD will remember the partitions and therefore potentially the whole lineage.
+   * the generated RDD will remember the partitions and therefore potentially the whole lineage.
+   * This function should be called only those RDD whose partitions refer to parent RDD's
+   * partitions (i.e., do not call it on simple RDD like MappedRDD).
+   *
    */
-  def testParentCheckpointing[U: ClassTag](
-      op: (RDD[Int]) => RDD[U],
-      testRDDSize: Boolean,
-      testRDDPartitionSize: Boolean
-    ) {
+  def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
     // Generate the final RDD using given RDD operation
-    val baseRDD = generateLongLineageRDD()
+    val baseRDD = generateFatRDD()
     val operatedRDD = op(baseRDD)
-    val parentRDD = operatedRDD.dependencies.head.rdd
+    val parentRDDs = operatedRDD.dependencies.map(_.rdd)
     val rddType = operatedRDD.getClass.getSimpleName
-    val parentRDDType = parentRDD.getClass.getSimpleName
 
-    // Get the partitions and dependencies of the parent in case they're lazily computed
-    parentRDD.dependencies
-    parentRDD.partitions
+    // Force initialization of all the data structures in RDDs
+    // Without this, serializing the RDD will give a wrong estimate of the size of the RDD
+    initializeRdd(operatedRDD)
 
     // Find serialized sizes before and after the checkpoint
-    val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
-    parentRDD.checkpoint()  // checkpoint the parent RDD, not the generated one
-    val result = operatedRDD.collect()
-    val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
+    val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+    parentRDDs.foreach(_.checkpoint())  // checkpoint the parent RDD, not the generated one
+    val result = operatedRDD.collect()  // force checkpointing
+    operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
+    val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
 
     // Test whether the data in the checkpointed RDD is same as original
     assert(operatedRDD.collect() === result)
 
-    // Test whether serialized size of the RDD has reduced because of its parent being
-    // checkpointed. If this RDD or its parent RDD do not have any dependency
-    // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may
-    // not reduce in size after checkpointing.
-    if (testRDDSize) {
-      assert(
-        rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
-        "Size of " + rddType + " did not reduce after checkpointing parent " + parentRDDType +
-          "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
-      )
-    }
-
-    // Test whether serialized size of the partitions has reduced because of its parent being
-    // checkpointed. If the partitions do not have any non-transient reference to another RDD
-    // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce
-    // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent
-    // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's
-    // partitions must have changed after checkpointing.
-    if (testRDDPartitionSize) {
-      assert(
-        splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
-        "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType +
-          "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
-      )
-    }
-
+    // Test whether serialized size of the partitions has reduced
+    logInfo("Size of partitions of " + rddType +
+      " [" + partitionSizeBeforeCheckpoint + " --> " + partitionSizeAfterCheckpoint + "]")
+    assert(
+      partitionSizeAfterCheckpoint < partitionSizeBeforeCheckpoint,
+      "Size of " + rddType + " partitions did not reduce after checkpointing parent RDDs" +
+        " [" + partitionSizeBeforeCheckpoint + " --> " + partitionSizeAfterCheckpoint + "]"
+    )
   }
 
   /**
-   * Generate an RDD with a long lineage of one-to-one dependencies.
+   * Generate an RDD such that both the RDD and its partitions have large size.
    */
-  def generateLongLineageRDD(): RDD[Int] = {
-    var rdd = sc.makeRDD(1 to 100, 4)
-    for (i <- 1 to 50) {
-      rdd = rdd.map(x => x + 1)
-    }
-    rdd
+  def generateFatRDD(): RDD[Int] = {
+    new FatRDD(sc.makeRDD(1 to 100, 4)).map(x => x)
   }
 
   /**
-   * Generate an RDD with a long lineage specifically for CoGroupedRDD.
-   * A CoGroupedRDD can have a long lineage only one of its parents have a long lineage
-   * and narrow dependency with this RDD. This method generate such an RDD by a sequence
-   * of cogroups and mapValues which creates a long lineage of narrow dependencies.
+   * Generate an pair RDD (with partitioner) such that both the RDD and its partitions
+   * have large size.
    */
-  def generateLongLineageRDDForCoGroupedRDD() = {
-    val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _)
-
-    def ones: RDD[(Int, Int)] = sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
-
-    var cogrouped: RDD[(Int, (Seq[Int], Seq[Int]))] = ones.cogroup(ones)
-    for(i <- 1 to 10) {
-      cogrouped = cogrouped.mapValues(add).cogroup(ones)
-    }
-    cogrouped.mapValues(add)
+  def generateFatPairRDD() = {
+    new FatPairRDD(sc.makeRDD(1 to 100, 4), partitioner).mapValues(x => x)
   }
 
   /**
@@ -362,8 +352,26 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
    * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint.
    */
   def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
-    (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length,
-     Utils.serialize(rdd.partitions).length)
+    val rddSize = Utils.serialize(rdd).size
+    val rddCpDataSize = Utils.serialize(rdd.checkpointData).size
+    val rddPartitionSize = Utils.serialize(rdd.partitions).size
+    val rddDependenciesSize = Utils.serialize(rdd.dependencies).size
+
+    // Print detailed size, helps in debugging
+    logInfo("Serialized sizes of " + rdd +
+      ": RDD = " + rddSize +
+      ", RDD checkpoint data = " + rddCpDataSize +
+      ", RDD partitions = " + rddPartitionSize +
+      ", RDD dependencies = " + rddDependenciesSize
+    )
+    // this makes sure that serializing the RDD's checkpoint data does not
+    // serialize the whole RDD as well
+    assert(
+      rddSize > rddCpDataSize,
+      "RDD's checkpoint data (" + rddCpDataSize  + ") is equal or larger than the " +
+        "whole RDD with checkpoint data (" + rddSize + ")"
+    )
+    (rddSize - rddCpDataSize, rddPartitionSize)
   }
 
   /**
@@ -375,8 +383,49 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     val bytes = Utils.serialize(obj)
     Utils.deserialize[T](bytes)
   }
+
+  /**
+   * Recursively force the initialization of the all members of an RDD and it parents.
+   */
+  def initializeRdd(rdd: RDD[_]) {
+    rdd.partitions // forces the
+    rdd.dependencies.map(_.rdd).foreach(initializeRdd(_))
+  }
 }
 
+/** RDD partition that has large serialized size. */
+class FatPartition(val partition: Partition) extends Partition {
+  val bigData = new Array[Byte](10000)
+  def index: Int = partition.index
+}
+
+/** RDD that has large serialized size. */
+class FatRDD(parent: RDD[Int]) extends RDD[Int](parent) {
+  val bigData = new Array[Byte](100000)
+
+  protected def getPartitions: Array[Partition] = {
+    parent.partitions.map(p => new FatPartition(p))
+  }
+
+  def compute(split: Partition, context: TaskContext): Iterator[Int] = {
+    parent.compute(split.asInstanceOf[FatPartition].partition, context)
+  }
+}
+
+/** Pair RDD that has large serialized size. */
+class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int, Int)](parent) {
+  val bigData = new Array[Byte](100000)
+
+  protected def getPartitions: Array[Partition] = {
+    parent.partitions.map(p => new FatPartition(p))
+  }
+
+  @transient override val partitioner = Some(_partitioner)
+
+  def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = {
+    parent.compute(split.asInstanceOf[FatPartition].partition, context).map(x => (x, x))
+  }
+}
 
 object CheckpointSuite {
   // This is a custom cogroup function that does not use mapValues like