You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/01/14 00:24:41 UTC

zeppelin git commit: [ZEPPELIN-1883] Can't import spark submitted packages in PySpark

Repository: zeppelin
Updated Branches:
  refs/heads/master 59e15c03d -> cb8e41870


[ZEPPELIN-1883] Can't import spark submitted packages in PySpark

### What is this PR for?

Fixed importing packages in pyspack requested by `SPARK_SUBMIT_OPTION`

### What type of PR is it?
[Bug Fix]

### Todos

Nothing

### What is the Jira issue?

[ZEPPELIN-1883](https://issues.apache.org/jira/browse/ZEPPELIN-1883)

### How should this be tested?

0. Download Apache Spark 1.6.2 (since it's the most recent for pyspark-cassandra)

1. Set `SPARK_HOME` and `SPARK_SUBMIT_OPTION` in `conf/zeppelin-env.sh` like

```sh
export SPARK_HOME="~/github/apache-spark/1.6.2-bin-hadoop2.6"
export SPARK_SUBMIT_OPTIONS="--packages com.datastax.spark:spark-cassandra-connector_2.10:1.6.2,TargetHolding:pyspark-cassandra:0.3.5 --exclude-packages org.slf4j:slf4j-api"
```

2. Check before that you can run `spark-submit` or not

```
./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.10:1.6.2,TargetHolding:pyspark-cassandra:0.3.5 --exclude-packages org.slf4j:slf4j-api --class org.apache.spark.examples.SparkPi lib/spark-examples-1.6.2-hadoop2.6.0.jar
```

3. Test whether submitted packages can be import or not

```
%pyspark

import pyspark_cassandra
```

### Screenshots (if appropriate)

```
import pyspark_cassandra

Traceback (most recent call last):
  File "/var/folders/lr/8g9y625n5j39rz6qhkg8s6640000gn/T/zeppelin_pyspark-5266742863961917074.py", line 267, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/var/folders/lr/8g9y625n5j39rz6qhkg8s6640000gn/T/zeppelin_pyspark-5266742863961917074.py", line 265, in <module>
    exec(code)
  File "<stdin>", line 1, in <module>
ImportError: No module named pyspark_cassandra
```

### Questions:
* Does the licenses files need update? - NO
* Is there breaking changes for older versions? - NO
* Does this needs documentation? - NO

Author: 1ambda <1a...@gmail.com>

Closes #1831 from 1ambda/ZEPPELIN-1883/cant-import-submitted-packages-in-pyspark and squashes the following commits:

585d48a [1ambda] Use spark.jars instead of classpath
f76d2c8 [1ambda] fix: Do not extend PYTHONPATH in yarn-client
c735bd5 [1ambda] fix: Import spark submit packages in pyspark


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/cb8e4187
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/cb8e4187
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/cb8e4187

Branch: refs/heads/master
Commit: cb8e4187029fdd7b892d84f23efd51acaed65f78
Parents: 59e15c0
Author: 1ambda <1a...@gmail.com>
Authored: Wed Jan 11 14:38:01 2017 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Fri Jan 13 16:24:27 2017 -0800

----------------------------------------------------------------------
 .../apache/zeppelin/spark/PySparkInterpreter.java  | 17 +++++++++++++++--
 .../apache/zeppelin/spark/SparkInterpreter.java    |  7 +++----
 2 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/cb8e4187/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 58f17e9..5a8e040 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -153,7 +153,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     }
 
     urls = urlList.toArray(urls);
-
     ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
     try {
       URLClassLoader newCl = new URLClassLoader(urls, oldCl);
@@ -169,11 +168,25 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
 
   private Map setupPySparkEnv() throws IOException{
     Map env = EnvironmentUtils.getProcEnvironment();
+
     if (!env.containsKey("PYTHONPATH")) {
       SparkConf conf = getSparkConf();
-      env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") + 
+      env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
               ":../interpreter/lib/python");
     }
+
+    // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT
+    // also, add all packages to PYTHONPATH since there might be transitive dependencies
+    if (SparkInterpreter.useSparkSubmit() &&
+        !getSparkInterpreter().isYarnMode()) {
+
+      String sparkSubmitJars = getSparkConf().get("spark.jars").replace(",", ":");
+
+      if (!"".equals(sparkSubmitJars)) {
+        env.put("PYTHONPATH", env.get("PYTHONPATH") + sparkSubmitJars);
+      }
+    }
+
     return env;
   }
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/cb8e4187/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 16bc4ba..7882303 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -295,7 +295,7 @@ public class SparkInterpreter extends Interpreter {
     return (DepInterpreter) p;
   }
 
-  private boolean isYarnMode() {
+  public boolean isYarnMode() {
     return getProperty("master").startsWith("yarn");
   }
 
@@ -555,7 +555,7 @@ public class SparkInterpreter extends Interpreter {
     return (o instanceof String) ? (String) o : "";
   }
 
-  private boolean useSparkSubmit() {
+  public static boolean useSparkSubmit() {
     return null != System.getenv("SPARK_SUBMIT");
   }
 
@@ -726,7 +726,6 @@ public class SparkInterpreter extends Interpreter {
     pathSettings.v_$eq(classpath);
     settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
 
-
     // set classloader for scala compiler
     settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
         .getContextClassLoader()));
@@ -979,7 +978,7 @@ public class SparkInterpreter extends Interpreter {
     }
   }
 
-  private List<File> currentClassPath() {
+  public List<File> currentClassPath() {
     List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
     String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
     if (cps != null) {