You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/11/15 05:14:37 UTC

spark git commit: [SPARK-4415] [PySpark] JVM should exit after Python exit

Repository: spark
Updated Branches:
  refs/heads/master 303a4e4d2 -> 7fe08b43c


[SPARK-4415] [PySpark] JVM should exit after Python exit

When JVM is started in a Python process, it should exit once the stdin is closed.

test: add spark.driver.memory in conf/spark-defaults.conf

```
daviesdm:~/work/spark$ cat conf/spark-defaults.conf
spark.driver.memory       8g
daviesdm:~/work/spark$ bin/pyspark
>>> quit
daviesdm:~/work/spark$ jps
4931 Jps
286
daviesdm:~/work/spark$ python wc.py
943738
0.719928026199
daviesdm:~/work/spark$ jps
286
4990 Jps
```

Author: Davies Liu <da...@databricks.com>

Closes #3274 from davies/exit and squashes the following commits:

df0e524 [Davies Liu] address comments
ce8599c [Davies Liu] address comments
050651f [Davies Liu] JVM should exit after Python exit


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fe08b43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fe08b43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fe08b43

Branch: refs/heads/master
Commit: 7fe08b43c78bf9e8515f671e72aa03a83ea782f8
Parents: 303a4e4
Author: Davies Liu <da...@databricks.com>
Authored: Fri Nov 14 20:13:46 2014 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Nov 14 20:14:33 2014 -0800

----------------------------------------------------------------------
 bin/pyspark                                              |  2 --
 bin/pyspark2.cmd                                         |  1 -
 .../spark/deploy/SparkSubmitDriverBootstrapper.scala     | 11 ++++++-----
 python/pyspark/java_gateway.py                           |  4 +++-
 4 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/bin/pyspark
----------------------------------------------------------------------
diff --git a/bin/pyspark b/bin/pyspark
index 1d8c94d..0b4f695 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -132,7 +132,5 @@ if [[ "$1" =~ \.py$ ]]; then
   gatherSparkSubmitOpts "$@"
   exec "$FWDIR"/bin/spark-submit "${SUBMISSION_OPTS[@]}" "$primary" "${APPLICATION_OPTS[@]}"
 else
-  # PySpark shell requires special handling downstream
-  export PYSPARK_SHELL=1
   exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
 fi

http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/bin/pyspark2.cmd
----------------------------------------------------------------------
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index 59415e9..a542ec8 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R "\.py"') do (
 )
 
 if [%PYTHON_FILE%] == [] (
-  set PYSPARK_SHELL=1
   if [%IPYTHON%] == [1] (
 	ipython %IPYTHON_OPTS%
   ) else (

http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index 7ffff29..aa3743c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -149,14 +149,15 @@ private[spark] object SparkSubmitDriverBootstrapper {
     // subprocess there already reads directly from our stdin, so we should avoid spawning a
     // thread that contends with the subprocess in reading from System.in.
     val isWindows = Utils.isWindows
-    val isPySparkShell = sys.env.contains("PYSPARK_SHELL")
+    val isSubprocess = sys.env.contains("IS_SUBPROCESS")
     if (!isWindows) {
       val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
       stdinThread.start()
-      // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM
-      // should terminate on broken pipe, which signals that the parent process has exited. In
-      // Windows, the termination logic for the PySpark shell is handled in java_gateway.py
-      if (isPySparkShell) {
+      // Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on
+      // broken pipe, signaling that the parent process has exited. This is the case if the
+      // application is launched directly from python, as in the PySpark shell. In Windows,
+      // the termination logic is handled in java_gateway.py
+      if (isSubprocess) {
         stdinThread.join()
         process.destroy()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/7fe08b43/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 9c70fa5..a975dc1 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -45,7 +45,9 @@ def launch_gateway():
             # Don't send ctrl-c / SIGINT to the Java gateway:
             def preexec_func():
                 signal.signal(signal.SIGINT, signal.SIG_IGN)
-            proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
+            env = dict(os.environ)
+            env["IS_SUBPROCESS"] = "1"  # tell JVM to exit after python exits
+            proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func, env=env)
         else:
             # preexec_fn not supported on Windows
             proc = Popen(command, stdout=PIPE, stdin=PIPE)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org