You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/10/25 10:20:52 UTC

git commit: [SPARK-4088] [PySpark] Python worker should exit after socket is closed by JVM

Repository: spark
Updated Branches:
  refs/heads/master 953031688 -> e41786c77


[SPARK-4088] [PySpark] Python worker should exit after socket is closed by JVM

In case of take() or exception in Python, python worker may exit before JVM read() all the response, then the write thread may raise "Connection reset" exception.

Python should always wait JVM to close the socket first.

cc JoshRosen This is a warm fix, or the tests will be flaky, sorry for that.

Author: Davies Liu <da...@databricks.com>

Closes #2941 from davies/fix_exit and squashes the following commits:

9d4d21e [Davies Liu] fix race


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e41786c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e41786c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e41786c7

Branch: refs/heads/master
Commit: e41786c77482d3f9e3c01cfd583c8899815c3106
Parents: 9530316
Author: Davies Liu <da...@databricks.com>
Authored: Sat Oct 25 01:20:39 2014 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Sat Oct 25 01:20:39 2014 -0700

----------------------------------------------------------------------
 python/pyspark/daemon.py | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e41786c7/python/pyspark/daemon.py
----------------------------------------------------------------------
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index dbb3477..f09587f 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -62,8 +62,7 @@ def worker(sock):
         exit_code = compute_real_exit_code(exc.code)
     finally:
         outfile.flush()
-        if exit_code:
-            os._exit(exit_code)
+    return exit_code
 
 
 # Cleanup zombie children
@@ -160,10 +159,13 @@ def manager():
                         outfile.flush()
                         outfile.close()
                         while True:
-                            worker(sock)
-                            if not reuse:
+                            code = worker(sock)
+                            if not reuse or code:
                                 # wait for closing
-                                while sock.recv(1024):
+                                try:
+                                    while sock.recv(1024):
+                                        pass
+                                except Exception:
                                     pass
                                 break
                             gc.collect()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org