You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2016/09/21 15:09:09 UTC

zeppelin git commit: ZEPPELIN-1411. UDF with pyspark not working - object has no attribute 'parseDataType'

Repository: zeppelin
Updated Branches:
  refs/heads/master 1e8559e65 -> c61f1fbce


ZEPPELIN-1411. UDF with pyspark not working - object has no attribute 'parseDataType'

### What is this PR for?
The root cause is that SQLContext's signature changes in spark 2.0.
Spark 1.6
```
def __init__(self, sparkContext, sqlContext=None):
```
Spark 2.0
```
def __init__(self, sparkContext, sparkSession=None, jsqlContext=None):
```
So we need to create SQLContext using named parameters, otherwise it would take intp.getSQLContext() as sparkSession which cause the issue.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1411

### How should this be tested?
Tested using the example code in ZEPPELIN-1411.

### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/164491/18260139/9bd702c0-741d-11e6-8b23-946c38a794c3.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zj...@apache.org>

Closes #1404 from zjffdu/ZEPPELIN-1411 and squashes the following commits:

40b080a [Jeff Zhang] retry
4922de1 [Jeff Zhang] log more logging for travis CI diangnose
4fe033d [Jeff Zhang] add unit test
296c63f [Jeff Zhang] ZEPPELIN-1411. UDF with pyspark not working - object has no attribute 'parseDataType'


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c61f1fbc
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c61f1fbc
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c61f1fbc

Branch: refs/heads/master
Commit: c61f1fbced7e184357c3fa37f0e16bf6ccc6ba3f
Parents: 1e8559e
Author: Jeff Zhang <zj...@apache.org>
Authored: Wed Sep 14 15:42:39 2016 +0800
Committer: Lee moon soo <mo...@apache.org>
Committed: Wed Sep 21 08:09:02 2016 -0700

----------------------------------------------------------------------
 .../apache/zeppelin/spark/SparkInterpreter.java | 16 +++++++---
 .../main/resources/python/zeppelin_pyspark.py   |  5 ++-
 .../zeppelin/rest/AbstractTestRestApi.java      | 16 ++++++++--
 .../zeppelin/rest/ZeppelinSparkClusterTest.java | 32 ++++++++++++++++++--
 .../src/test/resources/log4j.properties         |  2 +-
 5 files changed, 59 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 02d766f..44c2a74 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -118,7 +118,7 @@ public class SparkInterpreter extends Interpreter {
 
   private Map<String, Object> binder;
   private SparkVersion sparkVersion;
-  private File outputDir;          // class outputdir for scala 2.11
+  private static File outputDir;          // class outputdir for scala 2.11
   private Object classServer;      // classserver for scala 2.11
 
 
@@ -603,8 +603,11 @@ public class SparkInterpreter extends Interpreter {
         sparkReplClassDir = System.getProperty("java.io.tmpdir");
       }
 
-      outputDir = createTempDir(sparkReplClassDir);
-
+      synchronized (sharedInterpreterLock) {
+        if (outputDir == null) {
+          outputDir = createTempDir(sparkReplClassDir);
+        }
+      }
       argList.add("-Yrepl-class-based");
       argList.add("-Yrepl-outdir");
       argList.add(outputDir.getAbsolutePath());
@@ -1307,7 +1310,12 @@ public class SparkInterpreter extends Interpreter {
     logger.info("Close interpreter");
 
     if (numReferenceOfSparkContext.decrementAndGet() == 0) {
-      sc.stop();
+      if (sparkSession != null) {
+        Utils.invokeMethod(sparkSession, "stop");
+      } else if (sc != null){
+        sc.stop();
+      }
+      sparkSession = null;
       sc = null;
       if (classServer != null) {
         Utils.invokeMethod(classServer, "stop");

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 3e6535f..53465c2 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -218,7 +218,10 @@ java_import(gateway.jvm, "scala.Tuple2")
 jconf = intp.getSparkConf()
 conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
 sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
-sqlc = SQLContext(sc, intp.getSQLContext())
+if sparkVersion.isSpark2():
+  sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext())
+else:
+  sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
 sqlContext = sqlc
 
 if sparkVersion.isSpark2():

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 580e5a0..eb080fe 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -154,7 +154,7 @@ public abstract class AbstractTestRestApi {
         // set spark master and other properties
         sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
         sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
-
+        sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
         // set spark home for pyspark
         sparkIntpSetting.getProperties().setProperty("spark.home", getSparkHome());
         pySpark = true;
@@ -171,10 +171,16 @@ public abstract class AbstractTestRestApi {
 
         String sparkHome = getSparkHome();
         if (sparkHome != null) {
-          sparkIntpSetting.getProperties().setProperty("master", "spark://" + getHostname() + ":7071");
+          if (System.getenv("SPARK_MASTER") != null) {
+            sparkIntpSetting.getProperties().setProperty("master", System.getenv("SPARK_MASTER"));
+          } else {
+            sparkIntpSetting.getProperties()
+                    .setProperty("master", "spark://" + getHostname() + ":7071");
+          }
           sparkIntpSetting.getProperties().setProperty("spark.cores.max", "2");
           // set spark home for pyspark
           sparkIntpSetting.getProperties().setProperty("spark.home", sparkHome);
+          sparkIntpSetting.getProperties().setProperty("zeppelin.spark.useHiveContext", "false");
           pySpark = true;
           sparkR = true;
         }
@@ -194,7 +200,11 @@ public abstract class AbstractTestRestApi {
   }
 
   private static String getSparkHome() {
-    String sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
+    String sparkHome = System.getenv("SPARK_HOME");
+    if (sparkHome != null) {
+      return sparkHome;
+    }
+    sparkHome = getSparkHomeRecursively(new File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
     System.out.println("SPARK HOME detected " + sparkHome);
     return sparkHome;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 1250f9c..4e516db 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -135,11 +135,38 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
             config.put("enabled", true);
             p.setConfig(config);
             p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))");
-//            p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
             note.run(p.getId());
             waitForFinish(p);
             assertEquals(Status.FINISHED, p.getStatus());
             assertEquals("55\n", p.getResult().message());
+            if (sparkVersion >= 13) {
+                // run sqlContext test
+                p = note.addParagraph();
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                p.setText("%pyspark from pyspark.sql import Row\n" +
+                        "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
+                        "df.collect()");
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.FINISHED, p.getStatus());
+                assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
+            }
+            if (sparkVersion >= 20) {
+                // run SparkSession test
+                p = note.addParagraph();
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                p.setText("%pyspark from pyspark.sql import Row\n" +
+                        "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
+                        "df.collect()");
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.FINISHED, p.getStatus());
+                assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
+            }
         }
         ZeppelinServer.notebook.removeNote(note.getId(), null);
     }
@@ -166,7 +193,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
 
             p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
                     + "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())");
-//            p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
             note.run(p.getId());
             waitForFinish(p);
             assertEquals(Status.FINISHED, p.getStatus());
@@ -257,6 +283,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
             assertEquals(Status.FINISHED, p1.getStatus());
             assertEquals("2\n", p1.getResult().message());
         }
+        ZeppelinServer.notebook.removeNote(note.getId(), null);
     }
 
     /**
@@ -270,7 +297,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
         config.put("enabled", true);
         p.setConfig(config);
         p.setText("%spark print(sc.version)");
-//        p.getRepl("org.apache.zeppelin.spark.SparkInterpreter").open();
         note.run(p.getId());
         waitForFinish(p);
         assertEquals(Status.FINISHED, p.getStatus());

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c61f1fbc/zeppelin-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties
index 376ce00..5007390 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -43,4 +43,4 @@ log4j.logger.DataNucleus.Datastore=ERROR
 # Log all JDBC parameters
 log4j.logger.org.hibernate.type=ALL
 
-
+log4j.logger.org.apache.zeppelin.interpreter.remote.RemoteInterpreter=DEBUG