You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2015/06/30 19:40:56 UTC
incubator-zeppelin git commit: [ZEPPELIN-97][ZEPPELIN-134] pyspark
issue with mllib api
Repository: incubator-zeppelin
Updated Branches:
refs/heads/master b15b20136 -> 6a894b09f
[ZEPPELIN-97][ZEPPELIN-134] pyspark issue with mllib api
There were issue [ZEPPELIN-97](https://issues.apache.org/jira/browse/ZEPPELIN-97) with pyspark 1.4. The reason is, from pyspark 1.4, java gateway is created with `auto_convert = True` option. This PR fixes the problem.
This PR also handles [ZEPPELIN-134](https://issues.apache.org/jira/browse/ZEPPELIN-134), inject sqlContext.
And it finally improves to print more verbose stacktrace message, for example
from
```
(<type 'exceptions.AttributeError'>, AttributeError("'list' object has no attribute '_get_object_id'",), <traceback object at 0x392b638>)
```
to
```
Traceback (most recent call last):
File "/var/folders/zt/nd4j13y14jjg7_5pc4xgy7t80000gn/T//zeppelin_pyspark.py", line 110, in <module>
eval(compiledCode)
File "<string>", line 3, in <module>
File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 1200, in withColumn
return self.select('*', col.alias(colName))
File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 738, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 630, in _jcols
return self._jseq(cols, _to_java_column)
File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/dataframe.py", line 617, in _jseq
return _to_seq(self.sql_ctx._sc, cols, converter)
File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/pyspark/sql/column.py", line 60, in _to_seq
return sc._jvm.PythonUtils.toSeq(cols)
File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 529, in __call__
[get_command_part(arg, self.pool) for arg in new_args])
File "/Users/moon/Projects/zeppelin/spark-1.4.0-bin-hadoop2.3/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 265, in get_command_part
command_part = REFERENCE_TYPE + parameter._get_object_id()
AttributeError: 'list' object has no attribute '_get_object_id'
```
Author: Lee moon soo <mo...@apache.org>
Closes #129 from Leemoonsoo/ZEPPELIN-97 and squashes the following commits:
1fa4bf6 [Lee moon soo] apply auto_convert for spark 1.4
bce3c1d [Lee moon soo] Print more stacktrace
Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/6a894b09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/6a894b09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/6a894b09
Branch: refs/heads/master
Commit: 6a894b09fbc599286df4db49993056b77b6bb6f6
Parents: b15b201
Author: Lee moon soo <mo...@apache.org>
Authored: Mon Jun 29 14:07:37 2015 -0700
Committer: Lee moon soo <mo...@apache.org>
Committed: Tue Jun 30 10:40:48 2015 -0700
----------------------------------------------------------------------
.../zeppelin/spark/PySparkInterpreter.java | 1 +
.../main/resources/python/zeppelin_pyspark.py | 16 +++++++++++-----
.../zeppelin/rest/ZeppelinSparkClusterTest.java | 19 +++++++++++++++++++
3 files changed, 31 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/6a894b09/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 95eefd8..092b077 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -137,6 +137,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
CommandLine cmd = CommandLine.parse(getProperty("zeppelin.pyspark.python"));
cmd.addArgument(scriptPath, false);
cmd.addArgument(Integer.toString(port), false);
+ cmd.addArgument(getJavaSparkContext().version(), false);
executor = new DefaultExecutor();
outputStream = new ByteArrayOutputStream();
PipedOutputStream ps = new PipedOutputStream();
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/6a894b09/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index e29544e..802015d 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -32,7 +32,12 @@ from pyspark.serializers import MarshalSerializer, PickleSerializer
from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row
client = GatewayClient(port=int(sys.argv[1]))
-gateway = JavaGateway(client)
+sparkVersion = sys.argv[2]
+
+if sparkVersion.startswith("1.4"):
+ gateway = JavaGateway(client, auto_convert = True)
+else:
+ gateway = JavaGateway(client)
java_import(gateway.jvm, "org.apache.spark.SparkEnv")
java_import(gateway.jvm, "org.apache.spark.SparkConf")
@@ -45,15 +50,15 @@ intp.onPythonScriptInitialized()
jsc = intp.getJavaSparkContext()
-if jsc.version().startswith("1.2"):
+if sparkVersion.startswith("1.2"):
java_import(gateway.jvm, "org.apache.spark.sql.SQLContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext")
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
-elif jsc.version().startswith("1.3"):
+elif sparkVersion.startswith("1.3"):
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
-elif jsc.version().startswith("1.4"):
+elif sparkVersion.startswith("1.4"):
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
@@ -64,6 +69,7 @@ jconf = intp.getSparkConf()
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
sqlc = SQLContext(sc, intp.getSQLContext())
+sqlContext = sqlc
z = intp.getZeppelinContext()
@@ -117,6 +123,6 @@ while True :
excInnerError = excInnerError[innerErrorStart:]
intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True)
except:
- intp.setStatementsFinished(str(sys.exc_info()), True)
+ intp.setStatementsFinished(traceback.format_exc(), True)
output.reset()
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/6a894b09/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index 758a1e4..fd4a8b3 100644
--- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -88,6 +88,25 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
}
ZeppelinServer.notebook.removeNote(note.id());
}
+
+ @Test
+ public void pySparkAutoConvertOptionTest() throws IOException {
+ // create new note
+ Note note = ZeppelinServer.notebook.createNote();
+
+ int sparkVersion = getSparkVersionNumber(note);
+
+ if (isPyspark() && sparkVersion >= 14) { // auto_convert enabled from spark 1.4
+ // run markdown paragraph, again
+ Paragraph p = note.addParagraph();
+ p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
+ + "print(sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())");
+ note.run(p.getId());
+ waitForFinish(p);
+ assertEquals("10\n", p.getResult().message());
+ }
+ ZeppelinServer.notebook.removeNote(note.id());
+ }
@Test
public void zRunTest() throws IOException {