You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/09/21 15:09:09 UTC
zeppelin git commit: ZEPPELIN-1411. UDF with pyspark not working -
object has no attribute 'parseDataType'
Repository: zeppelin
Updated Branches:
refs/heads/master 1e8559e65 -> c61f1fbce
ZEPPELIN-1411. UDF with pyspark not working - object has no attribute 'parseDataType'
### What is this PR for?
The root cause is that SQLContext's signature changes in spark 2.0.
Spark 1.6
```
def __init__(self, sparkContext, sqlContext=None):
```
Spark 2.0
```
def __init__(self, sparkContext, sparkSession=None, jsqlContext=None):
```
So we need to create SQLContext using named parameters, otherwise it would take intp.getSQLContext() as sparkSession which cause the issue.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1411
### How should this be tested?
Tested using the example code in ZEPPELIN-1411.
### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/164491/18260139/9bd702c0-741d-11e6-8b23-946c38a794c3.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #1404 from zjffdu/ZEPPELIN-1411 and squashes the following commits:
40b080a [Jeff Zhang] retry
4922de1 [Jeff Zhang] log more logging for travis CI diangnose
4fe033d [Jeff Zhang] add unit test
296c63f [Jeff Zhang] ZEPPELIN-1411. UDF with pyspark not working - object has no attribute 'parseDataType'
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c61f1fbc
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c61f1fbc
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c61f1fbc
Branch: refs/heads/master
Commit: c61f1fbced7e184357c3fa37f0e16bf6ccc6ba3f
Parents: 1e8559e
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Sep 14 15:42:39 2016 +0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Sep 21 08:09:02 2016 -0700
----------------------------------------------------------------------
.../apache/zeppelin/spark/SparkInterpreter.java | 16 +++++++---
.../main/resources/python/zeppelin_pyspark.py | 5 ++-
.../zeppelin/rest/AbstractTestRestApi.java | 16 ++++++++--
.../zeppelin/rest/ZeppelinSparkClusterTest.java | 32 ++++++++++++++++++--
.../src/test/resources/log4j.properties | 2 +-
5 files changed, 59 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 02d766f..44c2a74 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -118,7 +118,7 @@ public class SparkInterpreter extends Interpreter {
private Map<String, Object> binder;
private SparkVersion sparkVersion;
- private File outputDir; // class outputdir for scala 2.11
+ private static File outputDir; // class outputdir for scala 2.11
private Object classServer; // classserver for scala 2.11
@@ -603,8 +603,11 @@ public class SparkInterpreter extends Interpreter {
sparkReplClassDir = System.getProperty("java.io.tmpdir");
}
- outputDir = createTempDir(sparkReplClassDir);
-
+ synchronized (sharedInterpreterLock) {
+ if (outputDir == null) {
+ outputDir = createTempDir(sparkReplClassDir);
+ }
+ }
argList.add("-Yrepl-class-based");
argList.add("-Yrepl-outdir");
argList.add(outputDir.getAbsolutePath());
@@ -1307,7 +1310,12 @@ public class SparkInterpreter extends Interpreter {
logger.info("Close interpreter");
if (numReferenceOfSparkContext.decrementAndGet() == 0) {
- sc.stop();
+ if (sparkSession != null) {
+ Utils.invokeMethod(sparkSession, "stop");
+ } else if (sc != null){
+ sc.stop();
+ }
+ sparkSession = null;
sc = null;
if (classServer != null) {
Utils.invokeMethod(classServer, "stop");
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 3e6535f..53465c2 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -218,7 +218,10 @@ java_import(gateway.jvm, "scala.Tuple2")
jconf = intp.getSparkConf()
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-sqlc = SQLContext(sc, intp.getSQLContext())
+if sparkVersion.isSpark2():
+ sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext())
+else:
+ sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
sqlContext = sqlc
if sparkVersion.isSpark2():
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 580e5a0..eb080fe 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -154,7 +154,7 @@ public abstract class AbstractTestRestApi {
// set spark master and other properties
sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
-
+ sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome());
pySpark = true;
@@ -171,10 +171,16 @@ public abstract class AbstractTestRestApi {
String sparkHome = getSparkHome();
if (sparkHome != null) {
- sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
+ if (System.getenv("SPARK_MASTER") != null) {
+ sparkIntpSetting.getProperties().setProperty("master", System.getenv("SPARK_MASTER"));
+ } else {
+ sparkIntpSetting.getProperties()
+ .setProperty("master", "spark://" + getHostname() + ":7071");
+ }
sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
// set spark home for pyspark
sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
+ sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
pySpark = true;
sparkR = true;
}
@@ -194,7 +200,11 @@ public abstract class AbstractTestRestApi {
}
private static String getSparkHome() {
- String sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
+ String sparkHome = System.getenv("SPARK_HOME");
+ if (sparkHome != null) {
+ return sparkHome;
+ }
+ sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
System.out.println("SPARK HOME detected " + sparkHome);
return sparkHome;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 1250f9c..4e516db 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -135,11 +135,38 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
config.put("enabled", true);
p.setConfig(config);
p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))");
-// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("55\n", p.getResult().message());
+ if (sparkVersion >= 13) {
+ // run sqlContext test
+ p = note.addParagraph();
+ config = p.getConfig();
+ config.put("enabled", true);
+ p.setConfig(config);
+ p.setText("%pyspark from pyspark.sql import Row\n" +
+ "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
+ "df.collect()");
+ note.run(p.getId());
+ waitForFinish(p);
+ assertEquals(Status.FINISHED, p.getStatus());
+ assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
+ }
+ if (sparkVersion >= 20) {
+ // run SparkSession test
+ p = note.addParagraph();
+ config = p.getConfig();
+ config.put("enabled", true);
+ p.setConfig(config);
+ p.setText("%pyspark from pyspark.sql import Row\n" +
+ "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
+ "df.collect()");
+ note.run(p.getId());
+ waitForFinish(p);
+ assertEquals(Status.FINISHED, p.getStatus());
+ assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
+ }
}
ZeppelinServer.notebook.removeNote(note.getId(), null);
}
@@ -166,7 +193,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
+ "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())");
-// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
@@ -257,6 +283,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
assertEquals(Status.FINISHED, p1.getStatus());
assertEquals("2\n", p1.getResult().message());
}
+ ZeppelinServer.notebook.removeNote(note.getId(), null);
}
/**
@@ -270,7 +297,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
config.put("enabled", true);
p.setConfig(config);
p.setText("%spark print(sc.version)");
-// p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/zeppelin-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties
index 376ce00..5007390 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -43,4 +43,4 @@ log4j.logger.DataNucleus.Datastore=ERROR
# Log all JDBC parameters
log4j.logger.org.hibernate.type=ALL
-
+log4j.logger.org.apache.zeppelin.interpreter.remote.RemoteInterpreter=DEBUG