You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/12 15:23:50 UTC

[spark] branch master updated: [SPARK-27041][PYSPARK] Use imap() for python 2.x to resolve oom issue

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 60a899b  [SPARK-27041][PYSPARK] Use imap() for python 2.x to resolve oom issue
60a899b is described below

commit 60a899b8c33e1e36bb80ab9fa054ba40bee9f4be
Author: TigerYang414 <39...@users.noreply.github.com>
AuthorDate: Tue Mar 12 10:23:26 2019 -0500

    [SPARK-27041][PYSPARK] Use imap() for python 2.x to resolve oom issue
    
    ## What changes were proposed in this pull request?
    
    With large partition, pyspark may exceeds executor memory limit and trigger out of memory for python 2.7.
    This is because map() is used. Unlike in python3.x, python 2.7 map() will generate a list and need to read all data into memory.
    
    The proposed fix will use imap in python 2.7 and it has been verified.
    
    ## How was this patch tested?
    Manual test.
    (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.
    
    Closes #23954 from TigerYang414/patch-1.
    
    Lead-authored-by: TigerYang414 <39...@users.noreply.github.com>
    Co-authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 python/pyspark/sql/tests/test_udf.py | 7 +++++++
 python/pyspark/worker.py             | 2 ++
 2 files changed, 9 insertions(+)

diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py
index 0a56ba8..ba00bba 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -600,6 +600,13 @@ class UDFTests(ReusedSQLTestCase):
             result = sql("select i from values(0L) as data(i) where i in (select id from v)")
             self.assertEqual(result.collect(), [Row(i=0)])
 
+    def test_udf_globals_not_overwritten(self):
+        @udf('string')
+        def f():
+            assert "itertools" not in str(map)
+
+        self.spark.range(1).select(f()).collect()
+
 
 class UDFInitializationTests(unittest.TestCase):
     def tearDown(self):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 0e9b6d6..7811012 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -45,6 +45,8 @@ from pyspark import shuffle
 
 if sys.version >= '3':
     basestring = str
+else:
+    from itertools import imap as map  # use iterator map by default
 
 pickleSer = PickleSerializer()
 utf8_deserializer = UTF8Deserializer()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org