You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/11/21 21:08:39 UTC
spark git commit: [SPARK-18361][PYSPARK] Expose RDD localCheckpoint
in PySpark
Repository: spark
Updated Branches:
refs/heads/master 07beb5d21 -> 70176871a
[SPARK-18361][PYSPARK] Expose RDD localCheckpoint in PySpark
## What changes were proposed in this pull request?
Expose RDD's localCheckpoint() and associated functions in PySpark.
## How was this patch tested?
I added a UnitTest in python/pyspark/tests.py which passes.
I certify that this is my original work, and I license it to the project under the project's open source license.
Gabriel HUANG
Developer at Cardabel (http://cardabel.com/)
Author: Gabriel Huang <ga...@gmail.com>
Closes #15811 from gabrielhuang/pyspark-localcheckpoint.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70176871
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70176871
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70176871
Branch: refs/heads/master
Commit: 70176871ae10509f1a727a96e96b3da7762605b1
Parents: 07beb5d
Author: Gabriel Huang <ga...@gmail.com>
Authored: Mon Nov 21 16:08:34 2016 -0500
Committer: Andrew Or <an...@gmail.com>
Committed: Mon Nov 21 16:08:34 2016 -0500
----------------------------------------------------------------------
python/pyspark/rdd.py | 33 ++++++++++++++++++++++++++++++++-
python/pyspark/tests.py | 17 +++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/70176871/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 641787e..f21a364 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -263,13 +263,44 @@ class RDD(object):
def isCheckpointed(self):
"""
- Return whether this RDD has been checkpointed or not
+ Return whether this RDD is checkpointed and materialized, either reliably or locally.
"""
return self._jrdd.rdd().isCheckpointed()
+ def localCheckpoint(self):
+ """
+ Mark this RDD for local checkpointing using Spark's existing caching layer.
+
+ This method is for users who wish to truncate RDD lineages while skipping the expensive
+ step of replicating the materialized data in a reliable distributed file system. This is
+ useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).
+
+ Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed
+ data is written to ephemeral local storage in the executors instead of to a reliable,
+ fault-tolerant storage. The effect is that if an executor fails during the computation,
+ the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
+
+ This is NOT safe to use with dynamic allocation, which removes executors along
+ with their cached blocks. If you must use both features, you are advised to set
+ L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value.
+
+ The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used.
+ """
+ self._jrdd.rdd().localCheckpoint()
+
+ def isLocallyCheckpointed(self):
+ """
+ Return whether this RDD is marked for local checkpointing.
+
+ Exposed for testing.
+ """
+ return self._jrdd.rdd().isLocallyCheckpointed()
+
def getCheckpointFile(self):
"""
Gets the name of the file to which this RDD was checkpointed
+
+ Not defined if RDD is checkpointed locally.
"""
checkpointFile = self._jrdd.rdd().getCheckpointFile()
if checkpointFile.isDefined():
http://git-wip-us.apache.org/repos/asf/spark/blob/70176871/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 3e0bd16..ab4bef8 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -390,6 +390,23 @@ class CheckpointTests(ReusedPySparkTestCase):
self.assertEqual([1, 2, 3, 4], recovered.collect())
+class LocalCheckpointTests(ReusedPySparkTestCase):
+
+ def test_basic_localcheckpointing(self):
+ parCollection = self.sc.parallelize([1, 2, 3, 4])
+ flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1))
+
+ self.assertFalse(flatMappedRDD.isCheckpointed())
+ self.assertFalse(flatMappedRDD.isLocallyCheckpointed())
+
+ flatMappedRDD.localCheckpoint()
+ result = flatMappedRDD.collect()
+ time.sleep(1) # 1 second
+ self.assertTrue(flatMappedRDD.isCheckpointed())
+ self.assertTrue(flatMappedRDD.isLocallyCheckpointed())
+ self.assertEqual(flatMappedRDD.collect(), result)
+
+
class AddFileTests(PySparkTestCase):
def test_add_py_file(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org