You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/01/26 01:15:23 UTC

spark git commit: [SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD

Repository: spark
Updated Branches:
  refs/heads/master 965c82d8c -> 47d5d0ddb


[SPARK-14804][SPARK][GRAPHX] Fix checkpointing of VertexRDD/EdgeRDD

## What changes were proposed in this pull request?

EdgeRDD/VertexRDD overrides checkpoint() and isCheckpointed() to forward these to the internal partitionRDD. So when checkpoint() is called on them, its the partitionRDD that actually gets checkpointed. However since isCheckpointed() also overridden to call partitionRDD.isCheckpointed, EdgeRDD/VertexRDD.isCheckpointed returns true even though this RDD is actually not checkpointed.

This would have been fine except the RDD's internal logic for computing the RDD depends on isCheckpointed(). So for VertexRDD/EdgeRDD, since isCheckpointed is true, when computing Spark tries to read checkpoint data of VertexRDD/EdgeRDD even though they are not actually checkpointed. Through a crazy sequence of call forwarding, it reads checkpoint data of partitionsRDD and tries to cast it to types in Vertex/EdgeRDD. This leads to ClassCastException.

The minimal fix that does not change any public behavior is to modify RDD internal to not use public override-able API for internal logic.
## How was this patch tested?

New unit tests.

Author: Tathagata Das <ta...@gmail.com>

Closes #15396 from tdas/SPARK-14804.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47d5d0dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47d5d0dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47d5d0dd

Branch: refs/heads/master
Commit: 47d5d0ddb06c7d2c86515d9556c41dc80081f560
Parents: 965c82d
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Jan 25 17:17:34 2017 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Jan 25 17:17:34 2017 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  5 ++--
 .../org/apache/spark/graphx/EdgeRDDSuite.scala  | 27 ++++++++++++++++++++
 .../apache/spark/graphx/VertexRDDSuite.scala    | 26 +++++++++++++++++++
 3 files changed, 56 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/47d5d0dd/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a7e01f3..0359508 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1610,14 +1610,15 @@ abstract class RDD[T: ClassTag](
   /**
    * Return whether this RDD is checkpointed and materialized, either reliably or locally.
    */
-  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
+  def isCheckpointed: Boolean = isCheckpointedAndMaterialized
 
   /**
    * Return whether this RDD is checkpointed and materialized, either reliably or locally.
    * This is introduced as an alias for `isCheckpointed` to clarify the semantics of the
    * return value. Exposed for testing.
    */
-  private[spark] def isCheckpointedAndMaterialized: Boolean = isCheckpointed
+  private[spark] def isCheckpointedAndMaterialized: Boolean =
+    checkpointData.exists(_.isCheckpointed)
 
   /**
    * Return whether this RDD is marked for local checkpointing.

http://git-wip-us.apache.org/repos/asf/spark/blob/47d5d0dd/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
index f1ecc9e..7a24e32 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -33,4 +34,30 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
     }
   }
 
+  test("checkpointing") {
+    withSpark { sc =>
+      val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
+      val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
+      sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+      edges.checkpoint()
+
+      // EdgeRDD not yet checkpointed
+      assert(!edges.isCheckpointed)
+      assert(!edges.isCheckpointedAndMaterialized)
+      assert(!edges.partitionsRDD.isCheckpointed)
+      assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+      val data = edges.collect().toSeq // force checkpointing
+
+      // EdgeRDD shows up as checkpointed, but internally it is not.
+      // Only internal partitionsRDD is checkpointed.
+      assert(edges.isCheckpointed)
+      assert(!edges.isCheckpointedAndMaterialized)
+      assert(edges.partitionsRDD.isCheckpointed)
+      assert(edges.partitionsRDD.isCheckpointedAndMaterialized)
+
+      assert(edges.collect().toSeq ===  data) // test checkpointed RDD
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47d5d0dd/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 0bb9e0a..8e63043 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
 import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -197,4 +198,29 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
     }
   }
 
+  test("checkpoint") {
+    withSpark { sc =>
+      val n = 100
+      val verts = vertices(sc, n)
+      sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
+      verts.checkpoint()
+
+      // VertexRDD not yet checkpointed
+      assert(!verts.isCheckpointed)
+      assert(!verts.isCheckpointedAndMaterialized)
+      assert(!verts.partitionsRDD.isCheckpointed)
+      assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+      val data = verts.collect().toSeq // force checkpointing
+
+      // VertexRDD shows up as checkpointed, but internally it is not.
+      // Only internal partitionsRDD is checkpointed.
+      assert(verts.isCheckpointed)
+      assert(!verts.isCheckpointedAndMaterialized)
+      assert(verts.partitionsRDD.isCheckpointed)
+      assert(verts.partitionsRDD.isCheckpointedAndMaterialized)
+
+      assert(verts.collect().toSeq === data) // test checkpointed RDD
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org