You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/11/10 10:48:24 UTC

[spark] branch branch-3.0 updated: [SPARK-33339][PYTHON] Pyspark application will hang due to non Exception error

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4a1c143  [SPARK-33339][PYTHON] Pyspark application will hang due to non Exception error
4a1c143 is described below

commit 4a1c143f1a042a9a23d00929670eadbdb1afca11
Author: lrz <lr...@lrzdeMacBook-Pro.local>
AuthorDate: Tue Nov 10 19:39:18 2020 +0900

    [SPARK-33339][PYTHON] Pyspark application will hang due to non Exception error
    
    ### What changes were proposed in this pull request?
    
    When a system.exit exception occurs during the process, the python worker exits abnormally, and then the executor task is still waiting for the worker for reading from socket, causing it to hang.
    The system.exit exception may be caused by the user's error code, but spark should at least throw an error to remind the user, not get stuck
    we can run a simple test to reproduce this case:
    
    ```
    from pyspark.sql import SparkSession
    def err(line):
      raise SystemExit
    spark = SparkSession.builder.appName("test").getOrCreate()
    spark.sparkContext.parallelize(range(1,2), 2).map(err).collect()
    spark.stop()
    ```
    
    ### Why are the changes needed?
    
    to make sure pyspark application won't hang if there's non-Exception error in python worker
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    added a new test and also manually tested the case above
    
    Closes #30248 from li36909/pyspark.
    
    Lead-authored-by: lrz <lr...@lrzdeMacBook-Pro.local>
    Co-authored-by: Hyukjin Kwon <gu...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 27bb40b6297361985e3590687f0332a72b71bc85)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 python/pyspark/tests/test_worker.py | 9 +++++++++
 python/pyspark/worker.py            | 4 ++--
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/tests/test_worker.py b/python/pyspark/tests/test_worker.py
index bfcbc43..f51d4b2 100644
--- a/python/pyspark/tests/test_worker.py
+++ b/python/pyspark/tests/test_worker.py
@@ -98,6 +98,15 @@ class WorkerTests(ReusedPySparkTestCase):
             self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
         self.assertEqual(100, rdd.map(str).count())
 
+    def test_after_non_exception_error(self):
+        # SPARK-33339: Pyspark application will hang due to non Exception
+        def raise_system_exit(_):
+            raise SystemExit()
+        rdd = self.sc.parallelize(range(100), 1)
+        with QuietTest(self.sc):
+            self.assertRaises(Exception, lambda: rdd.foreach(raise_system_exit))
+        self.assertEqual(100, rdd.map(str).count())
+
     def test_after_jvm_exception(self):
         tempFile = tempfile.NamedTemporaryFile(delete=False)
         tempFile.write(b"Hello World!")
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 814f796..0bce87d 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -608,7 +608,7 @@ def main(infile, outfile):
         # reuse.
         TaskContext._setTaskContext(None)
         BarrierTaskContext._setTaskContext(None)
-    except Exception:
+    except BaseException:
         try:
             exc_info = traceback.format_exc()
             if isinstance(exc_info, bytes):
@@ -622,7 +622,7 @@ def main(infile, outfile):
         except IOError:
             # JVM close the socket
             pass
-        except Exception:
+        except BaseException:
             # Write the error to stderr if it happened while serializing
             print("PySpark worker failed with exception:", file=sys.stderr)
             print(traceback.format_exc(), file=sys.stderr)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org