You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/10/09 04:09:56 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5049]. NullPointerException when running z.run(noteId, paragraphId)

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 2615f41  [ZEPPELIN-5049]. NullPointerException when running z.run(noteId, paragraphId)
2615f41 is described below

commit 2615f41b73c8ffb7fbb9e6de25d1ed4d208203ca
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Sep 24 11:21:50 2020 +0800

    [ZEPPELIN-5049]. NullPointerException when running z.run(noteId, paragraphId)
    
    # What is this PR for?
    
    The root cause is that the interpreter context of python thread is null. This PR fix this issue by calling method `setInterpreterContextInPython` in python side. It fix this issue in both PySparkInterpreter & IPySparkInterpreter.
    
    ### What type of PR is it?
    [Bug Fix ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5049
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3922 from zjffdu/ZEPPELIN-5049 and squashes the following commits:
    
    84a196b17 [Jeff Zhang] [ZEPPELIN-5049]. NullPointerException when running z.run(noteId, paragraphId)
    
    (cherry picked from commit 8b04a94f1a2645745310c4a765685b2f1547de5e)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../org/apache/zeppelin/spark/IPySparkInterpreter.java    | 15 +++++++++++++++
 .../org/apache/zeppelin/spark/PySparkInterpreter.java     | 11 +++++++++++
 .../zeppelin/integration/ZeppelinSparkClusterTest.java    |  2 +-
 3 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index b2a1bc1..60611ce 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -41,6 +41,7 @@ public class IPySparkInterpreter extends IPythonInterpreter {
 
   private SparkInterpreter sparkInterpreter;
   private boolean opened = false;
+  private InterpreterContext curIntpContext;
 
   public IPySparkInterpreter(Properties property) {
     super(property);
@@ -116,6 +117,14 @@ public class IPySparkInterpreter extends IPythonInterpreter {
       if (result.code().equals(InterpreterResult.Code.ERROR)) {
         return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
       }
+
+      this.curIntpContext = context;
+      String setInptContextStmt = "intp.setInterpreterContextInPython()";
+      result = super.interpret(setInptContextStmt, context);
+      if (result.code().equals(InterpreterResult.Code.ERROR)) {
+        return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setCurIntpContext");
+      }
+
       return super.interpret(st, context);
     } finally {
       System.setOut(originalStdout);
@@ -123,6 +132,12 @@ public class IPySparkInterpreter extends IPythonInterpreter {
     }
   }
 
+  // Python side will call InterpreterContext.get() too, but it is in a different thread other than the
+  // java interpreter thread. So we should call this method in python side as well.
+  public void setInterpreterContextInPython() {
+    InterpreterContext.set(curIntpContext);
+  }
+
   @Override
   public void cancel(InterpreterContext context) throws InterpreterException {
     super.cancel(context);
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 945aa40..29ae666 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -51,6 +51,7 @@ public class PySparkInterpreter extends PythonInterpreter {
   private static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreter.class);
 
   private SparkInterpreter sparkInterpreter;
+  private InterpreterContext curIntpContext;
 
   public PySparkInterpreter(Properties property) {
     super(property);
@@ -127,6 +128,7 @@ public class PySparkInterpreter extends PythonInterpreter {
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context)
       throws InterpreterException {
+    curIntpContext = context;
     // redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
     PrintStream originalStdout = System.out;
     PrintStream originalStderr = System.err;
@@ -134,6 +136,7 @@ public class PySparkInterpreter extends PythonInterpreter {
       System.setOut(new PrintStream(context.out));
       System.setErr(new PrintStream(context.out));
       Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
+
       return super.interpret(st, context);
     } finally {
       System.setOut(originalStdout);
@@ -155,6 +158,14 @@ public class PySparkInterpreter extends PythonInterpreter {
     }
     String setPoolStmt = "if 'sc' in locals():\n\tsc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
     callPython(new PythonInterpretRequest(setPoolStmt, false, false));
+
+    callPython(new PythonInterpretRequest("intp.setInterpreterContextInPython()", false, false));
+  }
+
+  // Python side will call InterpreterContext.get() too, but it is in a different thread other than the
+  // java interpreter thread. So we should call this method in python side as well.
+  public void setInterpreterContextInPython() {
+    InterpreterContext.set(curIntpContext);
   }
 
   // Run python shell
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
index f8daaa7..3a16670 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/ZeppelinSparkClusterTest.java
@@ -582,7 +582,7 @@ public abstract class ZeppelinSparkClusterTest extends AbstractTestRestApi {
       p21.setText("%spark print(a)");
 
       // run p20 of note2 via paragraph in note1
-      p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note2.getId(), p20.getId()));
+      p0.setText(String.format("%%spark.pyspark z.run(\"%s\", \"%s\")", note2.getId(), p20.getId()));
       note.run(p0.getId(), true);
       waitForFinish(p20);
       assertEquals(Status.FINISHED, p20.getStatus());