You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/04/10 03:34:59 UTC
spark git commit: [SPARK-13687][PYTHON] Cleanup PySpark parallelize
temporary files
Repository: spark
Updated Branches:
refs/heads/master 5989c85b5 -> 00288ea2a
[SPARK-13687][PYTHON] Cleanup PySpark parallelize temporary files
## What changes were proposed in this pull request?
Eagerly cleanup PySpark's temporary parallelize cleanup files rather than waiting for shut down.
## How was this patch tested?
Unit tests
Author: Holden Karau <ho...@us.ibm.com>
Closes #12233 from holdenk/SPARK-13687-cleanup-pyspark-temporary-files.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00288ea2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00288ea2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00288ea2
Branch: refs/heads/master
Commit: 00288ea2a463180e91fd16c8e2b627e69566e1f0
Parents: 5989c85
Author: Holden Karau <ho...@us.ibm.com>
Authored: Sun Apr 10 02:34:54 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Apr 10 02:34:54 2016 +0100
----------------------------------------------------------------------
python/pyspark/context.py | 22 +++++++++++++---------
python/pyspark/tests.py | 7 +++++++
2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/00288ea2/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 529d16b..cb15b4b 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -428,15 +428,19 @@ class SparkContext(object):
# because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile().
tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir)
- # Make sure we distribute data evenly if it's smaller than self.batchSize
- if "__len__" not in dir(c):
- c = list(c) # Make it a list so we can compute its length
- batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
- serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
- serializer.dump_stream(c, tempFile)
- tempFile.close()
- readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
- jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
+ try:
+ # Make sure we distribute data evenly if it's smaller than self.batchSize
+ if "__len__" not in dir(c):
+ c = list(c) # Make it a list so we can compute its length
+ batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
+ serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
+ serializer.dump_stream(c, tempFile)
+ tempFile.close()
+ readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
+ jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
+ finally:
+ # readRDDFromFile eagerily reads the file so we can delete right after.
+ os.unlink(tempFile.name)
return RDD(jrdd, self, serializer)
def pickleFile(self, name, minPartitions=None):
http://git-wip-us.apache.org/repos/asf/spark/blob/00288ea2/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 15c87e2..97ea39d 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1914,6 +1914,13 @@ class ContextTests(unittest.TestCase):
with SparkContext.getOrCreate() as sc:
self.assertTrue(SparkContext.getOrCreate() is sc)
+ def test_parallelize_eager_cleanup(self):
+ with SparkContext() as sc:
+ temp_files = os.listdir(sc._temp_dir)
+ rdd = sc.parallelize([0, 1, 2])
+ post_parallalize_temp_files = os.listdir(sc._temp_dir)
+ self.assertEqual(temp_files, post_parallalize_temp_files)
+
def test_stop(self):
sc = SparkContext()
self.assertNotEqual(SparkContext._active_spark_context, None)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org