You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/04/17 14:17:58 UTC
zeppelin git commit: [Branch-0.7] Support for Zeppelin Context
redefinition on Python
Repository: zeppelin
Updated Branches:
refs/heads/branch-0.7 22fecd3e5 -> d742b7e68
[Branch-0.7] Support for Zeppelin Context redefinition on Python
Fixes a build problem in Zeppelin branch0.7.
Hotfix.
same pr #2207
Author: CloverHearts <cl...@gmail.com>
Closes #2257 from cloverhearts/fix/branch-0.7-build-hotfix and squashes the following commits:
2da0b423b [CloverHearts] fix branch-0.7 build error
a3a615c3b [CloverHearts] [Zeppelin-802] Support for Zeppelin Context redefinition on Python and Pyspark
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d742b7e6
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d742b7e6
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d742b7e6
Branch: refs/heads/branch-0.7
Commit: d742b7e68ea8b8877abd3533d98d3e92515c9136
Parents: 22fecd3
Author: CloverHearts <cl...@gmail.com>
Authored: Mon Apr 17 22:39:08 2017 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Mon Apr 17 23:17:53 2017 +0900
----------------------------------------------------------------------
.../zeppelin/python/PythonInterpreter.java | 2 +-
.../python/PythonInterpreterPandasSql.java | 3 +-
.../main/resources/python/zeppelin_python.py | 16 +++++---
.../zeppelin/python/PythonInterpreterTest.java | 13 ++++++
.../zeppelin/spark/PySparkInterpreter.java | 8 ++--
.../main/resources/python/zeppelin_pyspark.py | 42 ++++++++++++++------
.../zeppelin/spark/PySparkInterpreterTest.java | 15 +++++++
7 files changed, 75 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index df62406..7f6a7eb 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -222,7 +222,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
// Add matplotlib display hook
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
- registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
+ registerHook(HookType.POST_EXEC_DEV, "__zeppelin__._displayhook()");
}
// Add matplotlib display hook
try {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
index 6bf1970..e73d7b3 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
@@ -87,7 +87,8 @@ public class PythonInterpreterPandasSql extends Interpreter {
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
Interpreter python = getPythonInterpreter();
- return python.interpret("z.show(pysqldf('" + st + "'))\nz._displayhook()", context);
+ return python.interpret(
+ "__zeppelin__.show(pysqldf('" + st + "'))\n__zeppelin__._displayhook()", context);
}
@Override
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/python/src/main/resources/python/zeppelin_python.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py
index a537c5d..31b993d 100644
--- a/python/src/main/resources/python/zeppelin_python.py
+++ b/python/src/main/resources/python/zeppelin_python.py
@@ -195,6 +195,7 @@ host = "127.0.0.1"
if len(sys.argv) >= 3:
host = sys.argv[2]
+_zcUserQueryNameSpace = {}
client = GatewayClient(address=host, port=int(sys.argv[1]))
#gateway = JavaGateway(client, auto_convert = True)
@@ -204,8 +205,11 @@ intp = gateway.entry_point
intp.onPythonScriptInitialized(os.getpid())
java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
-z = PyZeppelinContext(intp)
-z._setup_matplotlib()
+z = __zeppelin__ = PyZeppelinContext(intp)
+__zeppelin__._setup_matplotlib()
+
+_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__
+_zcUserQueryNameSpace["z"] = z
output = Logger()
sys.stdout = output
@@ -227,7 +231,7 @@ while True :
global_hook = None
try:
- user_hook = z.getHook('post_exec')
+ user_hook = __zeppelin__.getHook('post_exec')
except:
user_hook = None
@@ -263,17 +267,17 @@ while True :
for node in to_run_exec:
mod = ast.Module([node])
code = compile(mod, '<stdin>', 'exec')
- exec(code)
+ exec(code, _zcUserQueryNameSpace)
for node in to_run_single:
mod = ast.Interactive([node])
code = compile(mod, '<stdin>', 'single')
- exec(code)
+ exec(code, _zcUserQueryNameSpace)
for node in to_run_hooks:
mod = ast.Module([node])
code = compile(mod, '<stdin>', 'exec')
- exec(code)
+ exec(code, _zcUserQueryNameSpace)
except:
raise Exception(traceback.format_exc())
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
----------------------------------------------------------------------
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
index b5cd680..837626c 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java
@@ -106,6 +106,19 @@ public class PythonInterpreterTest implements InterpreterOutputListener {
assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("hi\nhi\nhi"));
}
+ @Test
+ public void testRedefinitionZeppelinContext() {
+ String pyRedefinitionCode = "z = 1\n";
+ String pyRestoreCode = "z = __zeppelin__\n";
+ String pyValidCode = "z.input(\"test\")\n";
+
+ assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyValidCode, context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyRedefinitionCode, context).code());
+ assertEquals(InterpreterResult.Code.ERROR, pythonInterpreter.interpret(pyValidCode, context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyRestoreCode, context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyValidCode, context).code());
+ }
+
@Override
public void onUpdateAll(InterpreterOutput out) {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 371578c..3ab5676 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -113,7 +113,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
// Add matplotlib display hook
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
- registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
+ registerHook(HookType.POST_EXEC_DEV, "__zeppelin__._displayhook()");
}
DepInterpreter depInterpreter = getDepInterpreter();
@@ -381,9 +381,9 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
return new InterpreterResult(Code.ERROR, errorMessage);
}
String jobGroup = sparkInterpreter.getJobGroup(context);
- ZeppelinContext z = sparkInterpreter.getZeppelinContext();
- z.setInterpreterContext(context);
- z.setGui(context.getGui());
+ ZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext();
+ __zeppelin__.setInterpreterContext(context);
+ __zeppelin__.setGui(context.getGui());
pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup);
statementOutput = null;
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/spark/src/main/resources/python/zeppelin_pyspark.py
----------------------------------------------------------------------
diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py
index 6c39400..5029d59 100644
--- a/spark/src/main/resources/python/zeppelin_pyspark.py
+++ b/spark/src/main/resources/python/zeppelin_pyspark.py
@@ -271,19 +271,37 @@ else:
java_import(gateway.jvm, "scala.Tuple2")
+_zcUserQueryNameSpace = {}
+
jconf = intp.getSparkConf()
conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
-sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
+sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
+_zcUserQueryNameSpace["_zsc_"] = _zsc_
+_zcUserQueryNameSpace["sc"] = sc
+
if sparkVersion.isSpark2():
- spark = SparkSession(sc, intp.getSparkSession())
- sqlc = spark._wrapped
+ spark = __zSpark__ = SparkSession(sc, intp.getSparkSession())
+ sqlc = __zSqlc__ = __zSpark__._wrapped
+ _zcUserQueryNameSpace["sqlc"] = sqlc
+ _zcUserQueryNameSpace["__zSqlc__"] = __zSqlc__
+ _zcUserQueryNameSpace["spark"] = spark
+ _zcUserQueryNameSpace["__zSpark__"] = __zSpark__
else:
- sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
-sqlContext = sqlc
+ sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext())
+ _zcUserQueryNameSpace["sqlc"] = sqlc
+ _zcUserQueryNameSpace["__zSqlc__"] = sqlc
+
+sqlContext = __zSqlc__
+_zcUserQueryNameSpace["sqlContext"] = sqlContext
+
+completion = __zeppelin_completion__ = PySparkCompletion(intp)
+_zcUserQueryNameSpace["completion"] = completion
+_zcUserQueryNameSpace["__zeppelin_completion__"] = __zeppelin_completion__
-completion = PySparkCompletion(intp)
-z = PyZeppelinContext(intp.getZeppelinContext())
-z._setup_matplotlib()
+z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext())
+__zeppelin__._setup_matplotlib()
+_zcUserQueryNameSpace["z"] = z
+_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__
while True :
req = intp.getStatements()
@@ -299,7 +317,7 @@ while True :
global_hook = None
try:
- user_hook = z.getHook('post_exec')
+ user_hook = __zeppelin__.getHook('post_exec')
except:
user_hook = None
@@ -334,17 +352,17 @@ while True :
for node in to_run_exec:
mod = ast.Module([node])
code = compile(mod, '<stdin>', 'exec')
- exec(code)
+ exec(code, _zcUserQueryNameSpace)
for node in to_run_single:
mod = ast.Interactive([node])
code = compile(mod, '<stdin>', 'single')
- exec(code)
+ exec(code, _zcUserQueryNameSpace)
for node in to_run_hooks:
mod = ast.Module([node])
code = compile(mod, '<stdin>', 'exec')
- exec(code)
+ exec(code, _zcUserQueryNameSpace)
except:
raise Exception(traceback.format_exc())
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d742b7e6/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
index 3697512..d47a8bd 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java
@@ -123,6 +123,21 @@ public class PySparkInterpreterTest {
}
}
+ @Test
+ public void testRedefinitionZeppelinContext() {
+ if (getSparkVersionNumber() > 11) {
+ String redefinitionCode = "z = 1\n";
+ String restoreCode = "z = __zeppelin__\n";
+ String validCode = "z.input(\"test\")\n";
+
+ assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(validCode, context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(redefinitionCode, context).code());
+ assertEquals(InterpreterResult.Code.ERROR, pySparkInterpreter.interpret(validCode, context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(restoreCode, context).code());
+ assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(validCode, context).code());
+ }
+ }
+
private class infinityPythonJob implements Runnable {
@Override
public void run() {