You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mi...@apache.org on 2016/09/27 01:37:38 UTC

zeppelin git commit: ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is created

Repository: zeppelin
Updated Branches:
  refs/heads/master c717daf65 -> 89cf8262e


ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is created

### What is this PR for?
The issue is that we create 2 SparkSession in zeppelin_pyspark.py (Because we create SQLContext first which will create SparkSession underlying). This cause 2 instances of SparkSession in JVM side and this means we have 2 instances of Catalog as well. So udf registered in SQLContext can not be used in SparkSession. This PR will create SparkSession first and then assign its internal SQLContext to sqlContext in pyspark.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-1442

### How should this be tested?
Integration test is added.

### Screenshots (if appropriate)
![image](https://cloud.githubusercontent.com/assets/164491/18774832/7f270de4-818f-11e6-9e4f-c4def4353e5c.png)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

\u2026

Author: Jeff Zhang <zj...@apache.org>

Closes #1452 from zjffdu/ZEPPELIN-1442 and squashes the following commits:

a15e3c6 [Jeff Zhang] fix unit test
93060b6 [Jeff Zhang] ZEPPELIN-1442. UDF can not be found due to 2 instances of SparkSession is created


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/89cf8262
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/89cf8262
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/89cf8262

Branch: refs/heads/master
Commit: 89cf8262e6a740c267acad0c040d5d52675d6c00
Parents: c717daf
Author: Jeff Zhang <zj...@apache.org>
Authored: Fri Sep 23 16:18:24 2016 +0800
Committer: Mina Lee <mi...@apache.org>
Committed: Tue Sep 27 10:37:28 2016 +0900

----------------------------------------------------------------------
 .../main/resources/python/zeppelin_pyspark.py   |  6 ++---
 .../zeppelin/rest/ZeppelinSparkClusterTest.java | 25 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cf8262/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 53465c2..49e60d4 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -219,14 +219,12 @@ jconf = intp.getSparkConf()
 conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
 sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
 if sparkVersion.isSpark2():
-  sqlc = SQLContext(sparkContext=sc, jsqlContext=intp.getSQLContext())
+  spark = SparkSession(sc, intp.getSparkSession())
+  sqlc = spark._wrapped
 else:
   sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
 sqlContext = sqlc
 
-if sparkVersion.isSpark2():
-  spark = SparkSession(sc, intp.getSparkSession())
-
 completion = PySparkCompletion(intp)
 z = PyZeppelinContext(intp.getZeppelinContext())
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/89cf8262/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 0255068..5084ae7 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -220,6 +220,18 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
                 assertEquals(InterpreterResult.Type.TABLE, p.getResult().type());
                 // TODO (zjffdu), one more \n is appended, need to investigate why.
                 assertEquals("age\tid\n20\t1\n\n", p.getResult().message());
+
+                // test udf
+                p = note.addParagraph();
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" +
+                       "sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()");
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.FINISHED, p.getStatus());
+                assertEquals("[Row(len=u'3')]\n", p.getResult().message());
             }
             if (sparkVersion >= 20) {
                 // run SparkSession test
@@ -234,6 +246,19 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
                 waitForFinish(p);
                 assertEquals(Status.FINISHED, p.getStatus());
                 assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
+
+                // test udf
+                p = note.addParagraph();
+                config = p.getConfig();
+                config.put("enabled", true);
+                p.setConfig(config);
+                // use SQLContext to register UDF but use this UDF through SparkSession
+                p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" +
+                        "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()");
+                note.run(p.getId());
+                waitForFinish(p);
+                assertEquals(Status.FINISHED, p.getStatus());
+                assertEquals("[Row(len=u'3')]\n", p.getResult().message());
             }
         }
         ZeppelinServer.notebook.removeNote(note.getId(), null);