You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2018/02/20 02:41:12 UTC
zeppelin git commit: ZEPPELIN-3234. z.show() compatibility with
previous release
Repository: zeppelin
Updated Branches:
refs/heads/master dfcd738df -> 3c502cd94
ZEPPELIN-3234. z.show() compatibility with previous release
### What is this PR for?
Enhance the ZeppelinContext in IPySparkInterpreter
### What type of PR is it?
[Bug Fix | Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3234
### How should this be tested?
* Unit test is added
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #2807 from zjffdu/ZEPPELIN-3234 and squashes the following commits:
39637ee [Jeff Zhang] ZEPPELIN-3234. z.show() compatibility with previous release
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/3c502cd9
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/3c502cd9
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/3c502cd9
Branch: refs/heads/master
Commit: 3c502cd948e9b877adea9c6589ab42d126cd4fbc
Parents: dfcd738
Author: Jeff Zhang <zj...@apache.org>
Authored: Thu Feb 15 14:19:00 2018 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Feb 20 10:41:06 2018 +0800
----------------------------------------------------------------------
.../zeppelin/python/IPythonInterpreter.java | 16 +++++++++++-----
.../zeppelin/spark/IPySparkInterpreter.java | 6 ++++++
.../main/resources/python/zeppelin_ipyspark.py | 14 ++++++++++++++
.../zeppelin/spark/IPySparkInterpreterTest.java | 18 ++++++++++++++++++
.../src/test/resources/log4j.properties | 3 +--
5 files changed, 50 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3c502cd9/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
index 81cfeb2..8078670 100644
--- a/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/IPythonInterpreter.java
@@ -30,6 +30,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@@ -76,7 +77,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
private IPythonClient ipythonClient;
private GatewayServer gatewayServer;
- private PythonZeppelinContext zeppelinContext;
+ protected BaseZeppelinContext zeppelinContext;
private String pythonExecutable;
private long ipythonLaunchTimeout;
private String additionalPythonPath;
@@ -114,6 +115,12 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
this.useBuiltinPy4j = add;
}
+ public BaseZeppelinContext buildZeppelinContext() {
+ return new PythonZeppelinContext(
+ getInterpreterGroup().getInterpreterHookRegistry(),
+ Integer.parseInt(getProperty("zeppelin.python.maxResult", "1000")));
+ }
+
@Override
public void open() throws InterpreterException {
try {
@@ -130,9 +137,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
ipythonLaunchTimeout = Long.parseLong(
getProperty("zeppelin.ipython.launch.timeout", "30000"));
- this.zeppelinContext = new PythonZeppelinContext(
- getInterpreterGroup().getInterpreterHookRegistry(),
- Integer.parseInt(getProperty("zeppelin.python.maxResult", "1000")));
+ this.zeppelinContext = buildZeppelinContext();
int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
@@ -312,6 +317,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
public InterpreterResult interpret(String st, InterpreterContext context) {
zeppelinContext.setGui(context.getGui());
zeppelinContext.setNoteGui(context.getNoteGui());
+ zeppelinContext.setInterpreterContext(context);
interpreterOutput.setInterpreterOutput(context.out);
ExecuteResponse response =
ipythonClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
@@ -361,7 +367,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
return completions;
}
- public PythonZeppelinContext getZeppelinContext() {
+ public BaseZeppelinContext getZeppelinContext() {
return zeppelinContext;
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3c502cd9/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index 37896f9..a75fda8 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@@ -93,6 +94,11 @@ public class IPySparkInterpreter extends IPythonInterpreter {
}
@Override
+ public BaseZeppelinContext buildZeppelinContext() {
+ return sparkInterpreter.getZeppelinContext();
+ }
+
+ @Override
public void cancel(InterpreterContext context) throws InterpreterException {
super.cancel(context);
sparkInterpreter.cancel(context);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3c502cd9/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
index 324f481..5723f45 100644
--- a/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
+++ b/spark/interpreter/src/main/resources/python/zeppelin_ipyspark.py
@@ -51,3 +51,17 @@ if intp.isSpark2():
sqlContext = sqlc = __zSqlc__ = __zSpark__._wrapped
else:
sqlContext = sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
+
+class IPySparkZeppelinContext(PyZeppelinContext):
+
+ def __init__(self, z):
+ super(IPySparkZeppelinContext, self).__init__(z)
+
+ def show(self, obj):
+ from pyspark.sql import DataFrame
+ if isinstance(obj, DataFrame):
+ print(self.z.showData(obj._jdf))
+ else:
+ super(IPySparkZeppelinContext, self).show(obj)
+
+z = __zeppelin__ = IPySparkZeppelinContext(intp.getZeppelinContext())
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3c502cd9/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index 10d87a6..5eaa42c 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -116,6 +116,15 @@ public class IPySparkInterpreterTest {
"| 1| a|\n" +
"| 2| b|\n" +
"+---+---+\n\n", interpreterResultMessages.get(0).getData());
+
+ context = getInterpreterContext();
+ result = iPySparkInterpreter.interpret("z.show(df)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ interpreterResultMessages = context.out.getInterpreterResultMessages();
+ assertEquals(
+ "_1 _2\n" +
+ "1 a\n" +
+ "2 b\n", interpreterResultMessages.get(0).getData());
} else {
result = iPySparkInterpreter.interpret("df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -127,6 +136,15 @@ public class IPySparkInterpreterTest {
"| 1| a|\n" +
"| 2| b|\n" +
"+---+---+\n\n", interpreterResultMessages.get(0).getData());
+
+ context = getInterpreterContext();
+ result = iPySparkInterpreter.interpret("z.show(df)", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ interpreterResultMessages = context.out.getInterpreterResultMessages();
+ assertEquals(
+ "_1 _2\n" +
+ "1 a\n" +
+ "2 b\n", interpreterResultMessages.get(0).getData());
}
// cancel
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3c502cd9/spark/interpreter/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/spark/interpreter/src/test/resources/log4j.properties b/spark/interpreter/src/test/resources/log4j.properties
index 6958d4c..0dc7c89 100644
--- a/spark/interpreter/src/test/resources/log4j.properties
+++ b/spark/interpreter/src/test/resources/log4j.properties
@@ -46,7 +46,6 @@ log4j.logger.org.hibernate.type=ALL
log4j.logger.org.apache.zeppelin.interpreter=DEBUG
log4j.logger.org.apache.zeppelin.spark=DEBUG
-log4j.logger.org.apache.zeppelin.python.IPythonInterpreter=DEBUG
-log4j.logger.org.apache.zeppelin.python.IPythonClient=DEBUG
+log4j.logger.org.apache.zeppelin.python=DEBUG
log4j.logger.org.apache.spark.repl.Main=INFO