You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/30 08:24:47 UTC

git commit: SPARK-1004. PySpark on YARN

Repository: spark
Updated Branches:
  refs/heads/master 7025dda8f -> ff5be9a41


SPARK-1004.  PySpark on YARN

This reopens https://github.com/apache/incubator-spark/pull/640 against the new repo

Author: Sandy Ryza <sa...@cloudera.com>

Closes #30 from sryza/sandy-spark-1004 and squashes the following commits:

89889d4 [Sandy Ryza] Move unzipping py4j to the generate-resources phase so that it gets included in the jar the first time
5165a02 [Sandy Ryza] Fix docs
fd0df79 [Sandy Ryza] PySpark on YARN


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff5be9a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff5be9a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff5be9a4

Branch: refs/heads/master
Commit: ff5be9a41e52454e0f9cae83dd1fd50fbeaa684a
Parents: 7025dda
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Tue Apr 29 23:24:34 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Apr 29 23:24:34 2014 -0700

----------------------------------------------------------------------
 bin/pyspark                                     |  1 +
 bin/pyspark2.cmd                                |  1 +
 core/pom.xml                                    | 42 ++++++++++++++++++++
 .../spark/api/python/PythonWorkerFactory.scala  | 10 +----
 docs/python-programming-guide.md                |  3 ++
 python/.gitignore                               |  3 ++
 python/lib/PY4J_VERSION.txt                     |  1 -
 python/pyspark/__init__.py                      |  7 ----
 python/pyspark/java_gateway.py                  | 29 +++++++++++++-
 python/pyspark/tests.py                         |  4 +-
 sbin/spark-config.sh                            |  3 ++
 11 files changed, 85 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/bin/pyspark
----------------------------------------------------------------------
diff --git a/bin/pyspark b/bin/pyspark
index cad982b..f555885 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -46,6 +46,7 @@ export PYSPARK_PYTHON
 
 # Add the PySpark classes to the Python path:
 export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
+export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
 
 # Load the PySpark shell.py script when ./pyspark is used interactively:
 export OLD_PYTHONSTARTUP=$PYTHONSTARTUP

http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/bin/pyspark2.cmd
----------------------------------------------------------------------
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index 9579109..d7cfd5e 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -45,6 +45,7 @@ rem Figure out which Python to use.
 if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
 
 set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
+set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
 
 set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
 set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py

http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 73f573a..822b5b1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -294,6 +294,48 @@
           </environmentVariables>
         </configuration>
       </plugin>
+      <!-- Unzip py4j so we can include its files in the jar -->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2.1</version>
+        <executions>
+          <execution>
+            <phase>generate-resources</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <executable>unzip</executable>
+          <workingDirectory>../python</workingDirectory>
+          <arguments>
+            <argument>-o</argument>
+            <argument>lib/py4j*.zip</argument>
+            <argument>-d</argument>
+            <argument>build</argument>
+          </arguments>
+        </configuration>
+      </plugin>
     </plugins>
+    
+    <resources>
+      <resource>
+        <directory>src/main/resources</directory>
+      </resource>
+      <resource>
+        <directory>../python</directory>
+        <includes>
+          <include>pyspark/*.py</include>
+        </includes>
+      </resource>
+      <resource>
+        <directory>../python/build</directory>
+        <includes>
+          <include>py4j/*.py</include>
+        </includes>
+      </resource>
+    </resources>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index a5f0f3d..02799ce 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
       serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
 
       // Create and start the worker
-      val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
-      val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
+      val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
       val workerEnv = pb.environment()
       workerEnv.putAll(envVars)
-      val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
-      workerEnv.put("PYTHONPATH", pythonPath)
       val worker = pb.start()
 
       // Redirect the worker's stderr to ours
@@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
 
       try {
         // Create and start the daemon
-        val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
-        val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
+        val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
         val workerEnv = pb.environment()
         workerEnv.putAll(envVars)
-        val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
-        workerEnv.put("PYTHONPATH", pythonPath)
         daemon = pb.start()
 
         // Redirect the stderr to ours

http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/docs/python-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 98233bf..98c4562 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -63,6 +63,9 @@ All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.
 Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
 The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.
 
+# Running PySpark on YARN
+
+To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".
 
 # Interactive Use
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/python/.gitignore
----------------------------------------------------------------------
diff --git a/python/.gitignore b/python/.gitignore
index 5c56e63..80b361f 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -1,2 +1,5 @@
 *.pyc
 docs/
+pyspark.egg-info
+build/
+dist/

http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/python/lib/PY4J_VERSION.txt
----------------------------------------------------------------------
diff --git a/python/lib/PY4J_VERSION.txt b/python/lib/PY4J_VERSION.txt
deleted file mode 100644
index 04a0cd5..0000000
--- a/python/lib/PY4J_VERSION.txt
+++ /dev/null
@@ -1 +0,0 @@
-b7924aabe9c5e63f0a4d8bbd17019534c7ec014e

http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 73fe737..07df869 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -49,13 +49,6 @@ Hive:
       Main entry point for accessing data stored in Apache Hive..
 """
 
-
-
-import sys
-import os
-sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip"))
-
-
 from pyspark.conf import SparkConf
 from pyspark.context import SparkContext
 from pyspark.sql import SQLContext

http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 6bb6c87..032d960 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -24,10 +24,11 @@ from threading import Thread
 from py4j.java_gateway import java_import, JavaGateway, GatewayClient
 
 
-SPARK_HOME = os.environ["SPARK_HOME"]
+def launch_gateway():
+    SPARK_HOME = os.environ["SPARK_HOME"]
 
+    set_env_vars_for_yarn()
 
-def launch_gateway():
     # Launch the Py4j gateway using Spark's run command so that we pick up the
     # proper classpath and settings from spark-env.sh
     on_windows = platform.system() == "Windows"
@@ -70,3 +71,27 @@ def launch_gateway():
     java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
     java_import(gateway.jvm, "scala.Tuple2")
     return gateway
+
+def set_env_vars_for_yarn():
+    # Add the spark jar, which includes the pyspark files, to the python path
+    env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
+    if "PYTHONPATH" in env_map:
+        env_map["PYTHONPATH"] += ":spark.jar"
+    else:
+        env_map["PYTHONPATH"] = "spark.jar"
+
+    os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())
+
+def parse_env(env_str):
+    # Turns a comma-separated of env settings into a dict that maps env vars to
+    # their values.
+    env = {}
+    for var_str in env_str.split(","):
+        parts = var_str.split("=")
+        if len(parts) == 2:
+            env[parts[0]] = parts[1]
+        elif len(var_str) > 0:
+            print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
+            sys.exit(1)
+    
+    return env

http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 5271045..8cf9d9c 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -30,10 +30,12 @@ import unittest
 
 from pyspark.context import SparkContext
 from pyspark.files import SparkFiles
-from pyspark.java_gateway import SPARK_HOME
 from pyspark.serializers import read_int
 
 
+SPARK_HOME = os.environ["SPARK_HOME"]
+
+
 class PySparkTestCase(unittest.TestCase):
 
     def setUp(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/ff5be9a4/sbin/spark-config.sh
----------------------------------------------------------------------
diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh
index cd2c7b7..147b506 100755
--- a/sbin/spark-config.sh
+++ b/sbin/spark-config.sh
@@ -34,3 +34,6 @@ this="$config_bin/$script"
 export SPARK_PREFIX=`dirname "$this"`/..
 export SPARK_HOME=${SPARK_PREFIX}
 export SPARK_CONF_DIR="$SPARK_HOME/conf"
+# Add the PySpark classes to the PYTHONPATH:
+export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
+export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH