You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/11/21 21:08:39 UTC

spark git commit: [SPARK-18361][PYSPARK] Expose RDD localCheckpoint in PySpark

Repository: spark
Updated Branches:
  refs/heads/master 07beb5d21 -> 70176871a


[SPARK-18361][PYSPARK] Expose RDD localCheckpoint in PySpark

## What changes were proposed in this pull request?

Expose RDD's localCheckpoint() and associated functions in PySpark.

## How was this patch tested?

I added a UnitTest in python/pyspark/tests.py which passes.

I certify that this is my original work, and I license it to the project under the project's open source license.

Gabriel HUANG
Developer at Cardabel (http://cardabel.com/)

Author: Gabriel Huang <ga...@gmail.com>

Closes #15811 from gabrielhuang/pyspark-localcheckpoint.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70176871
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70176871
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70176871

Branch: refs/heads/master
Commit: 70176871ae10509f1a727a96e96b3da7762605b1
Parents: 07beb5d
Author: Gabriel Huang <ga...@gmail.com>
Authored: Mon Nov 21 16:08:34 2016 -0500
Committer: Andrew Or <an...@gmail.com>
Committed: Mon Nov 21 16:08:34 2016 -0500

----------------------------------------------------------------------
 python/pyspark/rdd.py   | 33 ++++++++++++++++++++++++++++++++-
 python/pyspark/tests.py | 17 +++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/70176871/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 641787e..f21a364 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -263,13 +263,44 @@ class RDD(object):
 
     def isCheckpointed(self):
         """
-        Return whether this RDD has been checkpointed or not
+        Return whether this RDD is checkpointed and materialized, either reliably or locally.
         """
         return self._jrdd.rdd().isCheckpointed()
 
+    def localCheckpoint(self):
+        """
+        Mark this RDD for local checkpointing using Spark's existing caching layer.
+
+        This method is for users who wish to truncate RDD lineages while skipping the expensive
+        step of replicating the materialized data in a reliable distributed file system. This is
+        useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).
+
+        Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed
+        data is written to ephemeral local storage in the executors instead of to a reliable,
+        fault-tolerant storage. The effect is that if an executor fails during the computation,
+        the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
+
+        This is NOT safe to use with dynamic allocation, which removes executors along
+        with their cached blocks. If you must use both features, you are advised to set
+        L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value.
+
+        The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used.
+        """
+        self._jrdd.rdd().localCheckpoint()
+
+    def isLocallyCheckpointed(self):
+        """
+        Return whether this RDD is marked for local checkpointing.
+
+        Exposed for testing.
+        """
+        return self._jrdd.rdd().isLocallyCheckpointed()
+
     def getCheckpointFile(self):
         """
         Gets the name of the file to which this RDD was checkpointed
+
+        Not defined if RDD is checkpointed locally.
         """
         checkpointFile = self._jrdd.rdd().getCheckpointFile()
         if checkpointFile.isDefined():

http://git-wip-us.apache.org/repos/asf/spark/blob/70176871/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 3e0bd16..ab4bef8 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -390,6 +390,23 @@ class CheckpointTests(ReusedPySparkTestCase):
         self.assertEqual([1, 2, 3, 4], recovered.collect())
 
 
+class LocalCheckpointTests(ReusedPySparkTestCase):
+
+    def test_basic_localcheckpointing(self):
+        parCollection = self.sc.parallelize([1, 2, 3, 4])
+        flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1))
+
+        self.assertFalse(flatMappedRDD.isCheckpointed())
+        self.assertFalse(flatMappedRDD.isLocallyCheckpointed())
+
+        flatMappedRDD.localCheckpoint()
+        result = flatMappedRDD.collect()
+        time.sleep(1)  # 1 second
+        self.assertTrue(flatMappedRDD.isCheckpointed())
+        self.assertTrue(flatMappedRDD.isLocallyCheckpointed())
+        self.assertEqual(flatMappedRDD.collect(), result)
+
+
 class AddFileTests(PySparkTestCase):
 
     def test_add_py_file(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org