You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/01/09 03:55:25 UTC
[spark] branch master updated: [SPARK-26549][PYSPARK] Fix for
python worker reuse take no effect for parallelize lazy iterable range
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dbbba80 [SPARK-26549][PYSPARK] Fix for python worker reuse take no effect for parallelize lazy iterable range
dbbba80 is described below
commit dbbba80b3cb319b147dcf82a69963eee662e289f
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Wed Jan 9 11:55:12 2019 +0800
[SPARK-26549][PYSPARK] Fix for python worker reuse take no effect for parallelize lazy iterable range
## What changes were proposed in this pull request?
During the follow-up work(#23435) for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because of the specialize rdd.parallelize logic for xrange(introduced in #3264) generated data by lazy iterable range, which don't need to use the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse takes no effect. See more details in [SPARK-26549](https://issue [...]
We fix this by force using the passed-in iterator.
## How was this patch tested?
New UT in test_worker.py.
Closes #23470 from xuanyuanking/SPARK-26549.
Authored-by: Yuanjian Li <xy...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/context.py | 8 ++++++++
python/pyspark/tests/test_worker.py | 12 +++++++++++-
2 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 64178eb..316fbc8 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -498,6 +498,14 @@ class SparkContext(object):
return start0 + int((split * size / numSlices)) * step
def f(split, iterator):
+ # it's an empty iterator here but we need this line for triggering the
+ # logic of signal handling in FramedSerializer.load_stream, for instance,
+ # SpecialLengths.END_OF_DATA_SECTION in _read_with_length. Since
+ # FramedSerializer.load_stream produces a generator, the control should
+ # at least be in that function once. Here we do it by explicitly converting
+ # the empty iterator to a list, thus make sure worker reuse takes effect.
+ # See more details in SPARK-26549.
+ assert len(list(iterator)) == 0
return xrange(getStart(split), getStart(split + 1), step)
return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
diff --git a/python/pyspark/tests/test_worker.py b/python/pyspark/tests/test_worker.py
index a33b77d..a4f108f 100644
--- a/python/pyspark/tests/test_worker.py
+++ b/python/pyspark/tests/test_worker.py
@@ -22,7 +22,7 @@ import time
from py4j.protocol import Py4JJavaError
-from pyspark.testing.utils import ReusedPySparkTestCase, QuietTest
+from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest
if sys.version_info[0] >= 3:
xrange = range
@@ -145,6 +145,16 @@ class WorkerTests(ReusedPySparkTestCase):
self.sc.pythonVer = version
+class WorkerReuseTest(PySparkTestCase):
+
+ def test_reuse_worker_of_parallelize_xrange(self):
+ rdd = self.sc.parallelize(xrange(20), 8)
+ previous_pids = rdd.map(lambda x: os.getpid()).collect()
+ current_pids = rdd.map(lambda x: os.getpid()).collect()
+ for pid in current_pids:
+ self.assertTrue(pid in previous_pids)
+
+
if __name__ == "__main__":
import unittest
from pyspark.tests.test_worker import *
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org