You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2017/01/14 00:24:41 UTC
zeppelin git commit: [ZEPPELIN-1883] Can't import spark submitted
packages in PySpark
Repository: zeppelin
Updated Branches:
refs/heads/master 59e15c03d -> cb8e41870
[ZEPPELIN-1883] Can't import spark submitted packages in PySpark
### What is this PR for?
Fixed importing packages in pyspack requested by `SPARK_SUBMIT_OPTION`
### What type of PR is it?
[Bug Fix]
### Todos
Nothing
### What is the Jira issue?
[ZEPPELIN-1883](https://issues.apache.org/jira/browse/ZEPPELIN-1883)
### How should this be tested?
0. Download Apache Spark 1.6.2 (since it's the most recent for pyspark-cassandra)
1. Set `SPARK_HOME` and `SPARK_SUBMIT_OPTION` in `conf/zeppelin-env.sh` like
```sh
export SPARK_HOME="~/github/apache-spark/1.6.2-bin-hadoop2.6"
export SPARK_SUBMIT_OPTIONS="--packages com.datastax.spark:spark-cassandra-connector_2.10:1.6.2,TargetHolding:pyspark-cassandra:0.3.5 --exclude-packages org.slf4j:slf4j-api"
```
2. Check before that you can run `spark-submit` or not
```
./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.10:1.6.2,TargetHolding:pyspark-cassandra:0.3.5 --exclude-packages org.slf4j:slf4j-api --class org.apache.spark.examples.SparkPi lib/spark-examples-1.6.2-hadoop2.6.0.jar
```
3. Test whether submitted packages can be import or not
```
%pyspark
import pyspark_cassandra
```
### Screenshots (if appropriate)
```
import pyspark_cassandra
Traceback (most recent call last):
File "/var/folders/lr/8g9y625n5j39rz6qhkg8s6640000gn/T/zeppelin_pyspark-5266742863961917074.py", line 267, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/var/folders/lr/8g9y625n5j39rz6qhkg8s6640000gn/T/zeppelin_pyspark-5266742863961917074.py", line 265, in <module>
exec(code)
File "<stdin>", line 1, in <module>
ImportError: No module named pyspark_cassandra
```
### Questions:
* Does the licenses files need update? - NO
* Is there breaking changes for older versions? - NO
* Does this needs documentation? - NO
Author: 1ambda <1a...@gmail.com>
Closes #1831 from 1ambda/ZEPPELIN-1883/cant-import-submitted-packages-in-pyspark and squashes the following commits:
585d48a [1ambda] Use spark.jars instead of classpath
f76d2c8 [1ambda] fix: Do not extend PYTHONPATH in yarn-client
c735bd5 [1ambda] fix: Import spark submit packages in pyspark
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/cb8e4187
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/cb8e4187
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/cb8e4187
Branch: refs/heads/master
Commit: cb8e4187029fdd7b892d84f23efd51acaed65f78
Parents: 59e15c0
Author: 1ambda <1a...@gmail.com>
Authored: Wed Jan 11 14:38:01 2017 +0900
Committer: Lee moon soo <mo...@apache.org>
Committed: Fri Jan 13 16:24:27 2017 -0800
----------------------------------------------------------------------
.../apache/zeppelin/spark/PySparkInterpreter.java | 17 +++++++++++++++--
.../apache/zeppelin/spark/SparkInterpreter.java | 7 +++----
2 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/cb8e4187/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 58f17e9..5a8e040 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -153,7 +153,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
urls = urlList.toArray(urls);
-
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
try {
URLClassLoader newCl = new URLClassLoader(urls, oldCl);
@@ -169,11 +168,25 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
private Map setupPySparkEnv() throws IOException{
Map env = EnvironmentUtils.getProcEnvironment();
+
if (!env.containsKey("PYTHONPATH")) {
SparkConf conf = getSparkConf();
- env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
+ env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") +
":../interpreter/lib/python");
}
+
+ // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT
+ // also, add all packages to PYTHONPATH since there might be transitive dependencies
+ if (SparkInterpreter.useSparkSubmit() &&
+ !getSparkInterpreter().isYarnMode()) {
+
+ String sparkSubmitJars = getSparkConf().get("spark.jars").replace(",", ":");
+
+ if (!"".equals(sparkSubmitJars)) {
+ env.put("PYTHONPATH", env.get("PYTHONPATH") + sparkSubmitJars);
+ }
+ }
+
return env;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/cb8e4187/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 16bc4ba..7882303 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -295,7 +295,7 @@ public class SparkInterpreter extends Interpreter {
return (DepInterpreter) p;
}
- private boolean isYarnMode() {
+ public boolean isYarnMode() {
return getProperty("master").startsWith("yarn");
}
@@ -555,7 +555,7 @@ public class SparkInterpreter extends Interpreter {
return (o instanceof String) ? (String) o : "";
}
- private boolean useSparkSubmit() {
+ public static boolean useSparkSubmit() {
return null != System.getenv("SPARK_SUBMIT");
}
@@ -726,7 +726,6 @@ public class SparkInterpreter extends Interpreter {
pathSettings.v_$eq(classpath);
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
-
// set classloader for scala compiler
settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread()
.getContextClassLoader()));
@@ -979,7 +978,7 @@ public class SparkInterpreter extends Interpreter {
}
}
- private List<File> currentClassPath() {
+ public List<File> currentClassPath() {
List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
if (cps != null) {