You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/01/09 03:55:25 UTC

[spark] branch master updated: [SPARK-26549][PYSPARK] Fix for python worker reuse take no effect for parallelize lazy iterable range

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dbbba80  [SPARK-26549][PYSPARK] Fix for python worker reuse take no effect for parallelize lazy iterable range
dbbba80 is described below

commit dbbba80b3cb319b147dcf82a69963eee662e289f
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Wed Jan 9 11:55:12 2019 +0800

    [SPARK-26549][PYSPARK] Fix for python worker reuse take no effect for parallelize lazy iterable range
    
    ## What changes were proposed in this pull request?
    
    During the follow-up work(#23435) for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because of the specialize rdd.parallelize logic for xrange(introduced in #3264) generated data by lazy iterable range, which don't need to use the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse takes no effect. See more details in [SPARK-26549](https://issue [...]
    
    We fix this by force using the passed-in iterator.
    
    ## How was this patch tested?
    New UT in test_worker.py.
    
    Closes #23470 from xuanyuanking/SPARK-26549.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/context.py           |  8 ++++++++
 python/pyspark/tests/test_worker.py | 12 +++++++++++-
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 64178eb..316fbc8 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -498,6 +498,14 @@ class SparkContext(object):
                 return start0 + int((split * size / numSlices)) * step
 
             def f(split, iterator):
+                # it's an empty iterator here but we need this line for triggering the
+                # logic of signal handling in FramedSerializer.load_stream, for instance,
+                # SpecialLengths.END_OF_DATA_SECTION in _read_with_length. Since
+                # FramedSerializer.load_stream produces a generator, the control should
+                # at least be in that function once. Here we do it by explicitly converting
+                # the empty iterator to a list, thus make sure worker reuse takes effect.
+                # See more details in SPARK-26549.
+                assert len(list(iterator)) == 0
                 return xrange(getStart(split), getStart(split + 1), step)
 
             return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
diff --git a/python/pyspark/tests/test_worker.py b/python/pyspark/tests/test_worker.py
index a33b77d..a4f108f 100644
--- a/python/pyspark/tests/test_worker.py
+++ b/python/pyspark/tests/test_worker.py
@@ -22,7 +22,7 @@ import time
 
 from py4j.protocol import Py4JJavaError
 
-from pyspark.testing.utils import ReusedPySparkTestCase, QuietTest
+from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest
 
 if sys.version_info[0] >= 3:
     xrange = range
@@ -145,6 +145,16 @@ class WorkerTests(ReusedPySparkTestCase):
             self.sc.pythonVer = version
 
 
+class WorkerReuseTest(PySparkTestCase):
+
+    def test_reuse_worker_of_parallelize_xrange(self):
+        rdd = self.sc.parallelize(xrange(20), 8)
+        previous_pids = rdd.map(lambda x: os.getpid()).collect()
+        current_pids = rdd.map(lambda x: os.getpid()).collect()
+        for pid in current_pids:
+            self.assertTrue(pid in previous_pids)
+
+
 if __name__ == "__main__":
     import unittest
     from pyspark.tests.test_worker import *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org