You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/06/30 19:40:56 UTC

incubator-zeppelin git commit: [ZEPPELIN-97][ZEPPELIN-134] pyspark issue with mllib api

Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master b15b20136 -> 6a894b09f


[ZEPPELIN-97][ZEPPELIN-134] pyspark issue with mllib api

There were issue [ZEPPELIN-97](https://issues.apache.org/jira/browse/ZEPPELIN-97) with pyspark 1.4. The reason is, from pyspark 1.4, java gateway is created with `auto_convert = True` option. This PR fixes the problem.

This PR also handles [ZEPPELIN-134](https://issues.apache.org/jira/browse/ZEPPELIN-134), inject sqlContext.

And it finally improves to print more verbose stacktrace message, for example

from

```
(<type 'exceptions.AttributeError'>, AttributeError("'list' object has no attribute '_get_object_id'",), <traceback object at 0x392b638>)
```

to

```
Traceback (most recent call last):
  File "/var/folders/zt/nd4j13y14jjg7_5pc4xgy7t80000gn/T//zeppelin_pyspark.py", line 110, in <module>
    eval(compiledCode)
  File "<string>", line 3, in <module>
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 1200, in withColumn
    return self.select('*', col.alias(colName))
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 738, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 630, in _jcols
    return self._jseq(cols, _to_java_column)
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 617, in _jseq
    return _to_seq(self.sql_ctx._sc, cols, converter)
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/column.py", line 60, in _to_seq
    return sc._jvm.PythonUtils.toSeq(cols)
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 529, in __call__
    [get_command_part(arg, self.pool) for arg in new_args])
  File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 265, in get_command_part
    command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'list' object has no attribute '_get_object_id'
```

Author: Lee moon soo <mo...@apache.org>

Closes #129 from Leemoonsoo/ZEPPELIN-97 and squashes the following commits:

1fa4bf6 [Lee moon soo] apply auto_convert for spark 1.4
bce3c1d [Lee moon soo] Print more stacktrace


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/6a894b09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/6a894b09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/6a894b09

Branch: refs/heads/master
Commit: 6a894b09fbc599286df4db49993056b77b6bb6f6
Parents: b15b201
Author: Lee moon soo <mo...@apache.org>
Authored: Mon Jun 29 14:07:37 2015 -0700
Committer: Lee moon soo <mo...@apache.org>
Committed: Tue Jun 30 10:40:48 2015 -0700

----------------------------------------------------------------------
 .../zeppelin/spark/PySparkInterpreter.java       |  1 +
 .../main/resources/python/zeppelin_pyspark.py    | 16 +++++++++++-----
 .../zeppelin/rest/ZeppelinSparkClusterTest.java  | 19 +++++++++++++++++++
 3 files changed, 31 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/6a894b09/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 95eefd8..092b077 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -137,6 +137,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
     CommandLine cmd = CommandLine.parse(getProperty("zeppelin.pyspark.python"));
     cmd.addArgument(scriptPath, false);
     cmd.addArgument(Integer.toString(port), false);
+    cmd.addArgument(getJavaSparkContext().version(), false);
     executor = new DefaultExecutor();
     outputStream = new ByteArrayOutputStream();
     PipedOutputStream ps = new PipedOutputStream();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/6a894b09/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index e29544e..802015d 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -32,7 +32,12 @@ from pyspark.serializers import MarshalSerializer, PickleSerializer
 from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row
 
 client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client)
+sparkVersion = sys.argv[2]
+
+if sparkVersion.startswith("1.4"):
+  gateway = JavaGateway(client, auto_convert = True)
+else:
+  gateway = JavaGateway(client)
 
 java_import(gateway.jvm, "org.apache.spark.SparkEnv")
 java_import(gateway.jvm, "org.apache.spark.SparkConf")
@@ -45,15 +50,15 @@ intp.onPythonScriptInitialized()
 
 jsc = intp.getJavaSparkContext()
 
-if jsc.version().startswith("1.2"):
+if sparkVersion.startswith("1.2"):
   java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
-elif jsc.version().startswith("1.3"):
+elif sparkVersion.startswith("1.3"):
   java_import(gateway.jvm, "org.apache.spark.sql.*")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-elif jsc.version().startswith("1.4"):
+elif sparkVersion.startswith("1.4"):
   java_import(gateway.jvm, "org.apache.spark.sql.*")
   java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
 
@@ -64,6 +69,7 @@ jconf = intp.getSparkConf()
 conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
 sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
 sqlc = SQLContext(sc, intp.getSQLContext())
+sqlContext = sqlc
 
 z = intp.getZeppelinContext()
 
@@ -117,6 +123,6 @@ while True :
        excInnerError = excInnerError[innerErrorStart:]
     intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True)
   except:
-    intp.setStatementsFinished(str(sys.exc_info()), True)
+    intp.setStatementsFinished(traceback.format_exc(), True)
 
   output.reset()

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/6a894b09/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 758a1e4..fd4a8b3 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -88,6 +88,25 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
     }
     ZeppelinServer.notebook.removeNote(note.id());
   }
+  
+  @Test
+  public void pySparkAutoConvertOptionTest() throws IOException {
+    // create new note
+    Note note = ZeppelinServer.notebook.createNote();
+
+    int sparkVersion = getSparkVersionNumber(note);
+
+    if (isPyspark() && sparkVersion >= 14) {   // auto_convert enabled from spark 1.4
+      // run markdown paragraph, again
+      Paragraph p = note.addParagraph();
+      p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
+          + "print(sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())");
+      note.run(p.getId());
+      waitForFinish(p);
+      assertEquals("10\n", p.getResult().message());
+    }
+    ZeppelinServer.notebook.removeNote(note.id());
+  }
 
   @Test
   public void zRunTest() throws IOException {