You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/10/09 04:09:56 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5049].
NullPointerException when running z.run(noteId, paragraphId)
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 2615f41 [ZEPPELIN-5049]. NullPointerException when running z.run(noteId, paragraphId)
2615f41 is described below
commit 2615f41b73c8ffb7fbb9e6de25d1ed4d208203ca
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Sep 24 11:21:50 2020 +0800
[ZEPPELIN-5049]. NullPointerException when running z.run(noteId, paragraphId)
# What is this PR for?
The root cause is that the interpreter context of python thread is null. This PR fix this issue by calling method `setInterpreterContextInPython` in python side. It fix this issue in both PySparkInterpreter & IPySparkInterpreter.
### What type of PR is it?
[Bug Fix ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5049
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3922 from zjffdu/ZEPPELIN-5049 and squashes the following commits:
84a196b17 [Jeff Zhang] [ZEPPELIN-5049]. NullPointerException when running z.run(noteId, paragraphId)
(cherry picked from commit 8b04a94f1a2645745310c4a765685b2f1547de5e)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../org/apache/zeppelin/spark/IPySparkInterpreter.java | 15 +++++++++++++++
.../org/apache/zeppelin/spark/PySparkInterpreter.java | 11 +++++++++++
.../zeppelin/integration/ZeppelinSparkClusterTest.java | 2 +-
3 files changed, 27 insertions(+), 1 deletion(-)
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index b2a1bc1..60611ce 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -41,6 +41,7 @@ public class IPySparkInterpreter extends IPythonInterpreter {
private SparkInterpreter sparkInterpreter;
private boolean opened = false;
+ private InterpreterContext curIntpContext;
public IPySparkInterpreter(Properties property) {
super(property);
@@ -116,6 +117,14 @@ public class IPySparkInterpreter extends IPythonInterpreter {
if (result.code().equals(InterpreterResult.Code.ERROR)) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
}
+
+ this.curIntpContext = context;
+ String setInptContextStmt = "intp.setInterpreterContextInPython()";
+ result = super.interpret(setInptContextStmt, context);
+ if (result.code().equals(InterpreterResult.Code.ERROR)) {
+ return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setCurIntpContext");
+ }
+
return super.interpret(st, context);
} finally {
System.setOut(originalStdout);
@@ -123,6 +132,12 @@ public class IPySparkInterpreter extends IPythonInterpreter {
}
}
+ // Python side will call InterpreterContext.get() too, but it is in a different thread other than the
+ // java interpreter thread. So we should call this method in python side as well.
+ public void setInterpreterContextInPython() {
+ InterpreterContext.set(curIntpContext);
+ }
+
@Override
public void cancel(InterpreterContext context) throws InterpreterException {
super.cancel(context);
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 945aa40..29ae666 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -51,6 +51,7 @@ public class PySparkInterpreter extends PythonInterpreter {
private static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreter.class);
private SparkInterpreter sparkInterpreter;
+ private InterpreterContext curIntpContext;
public PySparkInterpreter(Properties property) {
super(property);
@@ -127,6 +128,7 @@ public class PySparkInterpreter extends PythonInterpreter {
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
+ curIntpContext = context;
// redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
PrintStream originalStdout = System.out;
PrintStream originalStderr = System.err;
@@ -134,6 +136,7 @@ public class PySparkInterpreter extends PythonInterpreter {
System.setOut(new PrintStream(context.out));
System.setErr(new PrintStream(context.out));
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
+
return super.interpret(st, context);
} finally {
System.setOut(originalStdout);
@@ -155,6 +158,14 @@ public class PySparkInterpreter extends PythonInterpreter {
}
String setPoolStmt = "if 'sc' in locals():\n\tsc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
callPython(new PythonInterpretRequest(setPoolStmt, false, false));
+
+ callPython(new PythonInterpretRequest("intp.setInterpreterContextInPython()", false, false));
+ }
+
+ // Python side will call InterpreterContext.get() too, but it is in a different thread other than the
+ // java interpreter thread. So we should call this method in python side as well.
+ public void setInterpreterContextInPython() {
+ InterpreterContext.set(curIntpContext);
}
// Run python shell
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
index f8daaa7..3a16670 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
@@ -582,7 +582,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
p21.setText("%spark print(a)");
// run p20 of note2 via paragraph in note1
- p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note2.getId(), p20.getId()));
+ p0.setText(String.format("%%spark.pyspark z.run(\"%s\", \"%s\")", note2.getId(), p20.getId()));
note.run(p0.getId(), true);
waitForFinish(p20);
assertEquals(Status.FINISHED, p20.getStatus());