You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/08/12 03:08:54 UTC

spark git commit: [SPARK-13081][PYSPARK][SPARK_SUBMIT] Allow set pythonExec of driver and executor through conf…

Repository: spark
Updated Branches:
  refs/heads/master ea0bf91b4 -> 7a9e25c38


[SPARK-13081][PYSPARK][SPARK_SUBMIT] Allow set pythonExec of driver and executor through conf\u2026

Before this PR, user have to export environment variable to specify the python of driver & executor which is not so convenient for users. This PR is trying to allow user to specify python through configuration "--pyspark-driver-python" & "--pyspark-executor-python"

Manually test in local & yarn mode for pyspark-shell and pyspark batch mode.

Author: Jeff Zhang <zj...@apache.org>

Closes #13146 from zjffdu/SPARK-13081.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a9e25c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a9e25c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a9e25c3

Branch: refs/heads/master
Commit: 7a9e25c38380e6c62080d62ad38a4830e44fe753
Parents: ea0bf91
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu Aug 11 20:08:25 2016 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Aug 11 20:08:39 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/PythonRunner.scala  | 14 ++++++++++---
 .../apache/spark/internal/config/package.scala  |  8 ++++++++
 .../spark/launcher/SparkLauncherSuite.java      |  8 ++++++++
 .../scala/org/apache/spark/SparkConfSuite.scala |  2 ++
 .../apache/spark/deploy/SparkSubmitSuite.scala  |  5 +++++
 docs/configuration.md                           | 21 ++++++++++++++++++--
 .../apache/spark/launcher/SparkLauncher.java    |  4 ++++
 .../launcher/SparkSubmitCommandBuilder.java     | 18 ++++++++++++++---
 8 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7a9e25c3/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 6227a30..0b1cec2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -24,8 +24,9 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
 import scala.util.Try
 
-import org.apache.spark.SparkUserAppException
+import org.apache.spark.{SparkConf, SparkUserAppException}
 import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.internal.config._
 import org.apache.spark.util.{RedirectThread, Utils}
 
 /**
@@ -37,8 +38,12 @@ object PythonRunner {
     val pythonFile = args(0)
     val pyFiles = args(1)
     val otherArgs = args.slice(2, args.length)
-    val pythonExec =
-      sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))
+    val sparkConf = new SparkConf()
+    val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+      .orElse(sparkConf.get(PYSPARK_PYTHON))
+      .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
+      .orElse(sys.env.get("PYSPARK_PYTHON"))
+      .getOrElse("python")
 
     // Format python file paths before adding them to the PYTHONPATH
     val formattedPythonFile = formatPath(pythonFile)
@@ -77,6 +82,9 @@ object PythonRunner {
     // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
     env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
     env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
+    // pass conf spark.pyspark.python to python process, the only way to pass info to
+    // python process is through environment variable.
+    sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _))
     builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
     try {
       val process = builder.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/7a9e25c3/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e646d99..be3dac4 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -106,4 +106,12 @@ package object config {
   private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
     .stringConf
     .createOptional
+
+  private[spark] val PYSPARK_DRIVER_PYTHON = ConfigBuilder("spark.pyspark.driver.python")
+    .stringConf
+    .createOptional
+
+  private[spark] val PYSPARK_PYTHON = ConfigBuilder("spark.pyspark.python")
+    .stringConf
+    .createOptional
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7a9e25c3/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index e393db0..682d988 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.bridge.SLF4JBridgeHandler;
 import static org.junit.Assert.*;
 
+import org.apache.spark.internal.config.package$;
+
 /**
  * These tests require the Spark assembly to be built before they can be run.
  */
@@ -89,6 +91,12 @@ public class SparkLauncherSuite {
     launcher.setConf("spark.foo", "foo");
     launcher.addSparkArg(opts.CONF, "spark.foo=bar");
     assertEquals("bar", launcher.builder.conf.get("spark.foo"));
+
+    launcher.setConf(SparkLauncher.PYSPARK_DRIVER_PYTHON, "python3.4");
+    launcher.setConf(SparkLauncher.PYSPARK_PYTHON, "python3.5");
+    assertEquals("python3.4", launcher.builder.conf.get(
+      package$.MODULE$.PYSPARK_DRIVER_PYTHON().key()));
+    assertEquals("python3.5", launcher.builder.conf.get(package$.MODULE$.PYSPARK_PYTHON().key()));
   }
 
   @Test(expected=IllegalStateException.class)

http://git-wip-us.apache.org/repos/asf/spark/blob/7a9e25c3/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index a883d1b..1f0f655 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -51,8 +51,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
 
   test("loading from system properties") {
     System.setProperty("spark.test.testProperty", "2")
+    System.setProperty("nonspark.test.testProperty", "0")
     val conf = new SparkConf()
     assert(conf.get("spark.test.testProperty") === "2")
+    assert(!conf.contains("nonspark.test.testProperty"))
   }
 
   test("initializing without loading defaults") {

http://git-wip-us.apache.org/repos/asf/spark/blob/7a9e25c3/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b2bc886..961ece3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark._
 import org.apache.spark.api.r.RUtils
 import org.apache.spark.deploy.SparkSubmit._
 import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
+import org.apache.spark.internal.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.TestUtils.JavaSourceFromString
 import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -512,6 +513,8 @@ class SparkSubmitSuite
     val clArgs3 = Seq(
       "--master", "local",
       "--py-files", pyFiles,
+      "--conf", "spark.pyspark.driver.python=python3.4",
+      "--conf", "spark.pyspark.python=python3.5",
       "mister.py"
     )
     val appArgs3 = new SparkSubmitArguments(clArgs3)
@@ -519,6 +522,8 @@ class SparkSubmitSuite
     appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
     sysProps3("spark.submit.pyFiles") should be (
       PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+    sysProps3(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4")
+    sysProps3(PYSPARK_PYTHON.key) should be ("python3.5")
   }
 
   test("resolves config paths correctly") {

http://git-wip-us.apache.org/repos/asf/spark/blob/7a9e25c3/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index e33094b..ae75318 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -427,6 +427,21 @@ Apart from these, the following properties are also available, and may be useful
     with <code>spark.jars.packages</code>.
   </td>
 </tr>
+<tr>
+  <td><code>spark.pyspark.driver.python</code></td>
+  <td></td>
+  <td>
+    Python binary executable to use for PySpark in driver.
+    (default is <code>spark.pyspark.python</code>)
+  </td>
+</tr>
+<tr>
+  <td><code>spark.pyspark.python</code></td>
+  <td></td>
+  <td>
+    Python binary executable to use for PySpark in both driver and executors.
+  </td>
+</tr>
 </table>
 
 #### Shuffle Behavior
@@ -1786,11 +1801,13 @@ The following variables can be set in `spark-env.sh`:
   </tr>
   <tr>
     <td><code>PYSPARK_PYTHON</code></td>
-    <td>Python binary executable to use for PySpark in both driver and workers (default is <code>python2.7</code> if available, otherwise <code>python</code>).</td>
+    <td>Python binary executable to use for PySpark in both driver and workers (default is <code>python2.7</code> if available, otherwise <code>python</code>).
+    Property <code>spark.pyspark.python</code> take precedence if it is set</td>
   </tr>
   <tr>
     <td><code>PYSPARK_DRIVER_PYTHON</code></td>
-    <td>Python binary executable to use for PySpark in driver only (default is <code>PYSPARK_PYTHON</code>).</td>
+    <td>Python binary executable to use for PySpark in driver only (default is <code>PYSPARK_PYTHON</code>).
+    Property <code>spark.pyspark.driver.python</code> take precedence if it is set</td>
   </tr>
   <tr>
     <td><code>SPARKR_DRIVER_R</code></td>

http://git-wip-us.apache.org/repos/asf/spark/blob/7a9e25c3/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index 41f7f1f..7b7a7bf 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -64,6 +64,10 @@ public class SparkLauncher {
   /** Configuration key for the number of executor CPU cores. */
   public static final String EXECUTOR_CORES = "spark.executor.cores";
 
+  static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python";
+
+  static final String PYSPARK_PYTHON = "spark.pyspark.python";
+
   /** Logger name to use when launching a child process. */
   public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7a9e25c3/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index b3ccc48..f6da644 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -294,11 +294,23 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
     appResource = PYSPARK_SHELL_RESOURCE;
     constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");
 
-    // The executable is the PYSPARK_DRIVER_PYTHON env variable set by the pyspark script,
-    // followed by PYSPARK_DRIVER_PYTHON_OPTS.
+    // Will pick up the binary executable in the following order
+    // 1. conf spark.pyspark.driver.python
+    // 2. conf spark.pyspark.python
+    // 3. environment variable PYSPARK_DRIVER_PYTHON
+    // 4. environment variable PYSPARK_PYTHON
+    // 5. python
     List<String> pyargs = new ArrayList<>();
-    pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), "python"));
+    pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
+      conf.get(SparkLauncher.PYSPARK_PYTHON),
+      System.getenv("PYSPARK_DRIVER_PYTHON"),
+      System.getenv("PYSPARK_PYTHON"),
+      "python"));
     String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
+    if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
+      // pass conf spark.pyspark.python to python by environment variable.
+      env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
+    }
     if (!isEmpty(pyOpts)) {
       pyargs.addAll(parseOptionString(pyOpts));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org