You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/03/30 03:45:11 UTC

[zeppelin] branch master updated: [ZEPPELIN-4692]. zeppelin pyspark doesn't print java output

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cdcc5b  [ZEPPELIN-4692]. zeppelin pyspark doesn't print java output
6cdcc5b is described below

commit 6cdcc5b01250afab52a766ede37c469686f9d492
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Mar 24 11:09:09 2020 +0800

    [ZEPPELIN-4692]. zeppelin pyspark doesn't print java output
    
    ### What is this PR for?
    
    The root cause of this issue is that we didn't redirect java output to interpreter output. This PR fix it via redirect java output before interpreting python code in both PySparkInterpreter & IPySparkInterpreter. Unit test is also added to verify this feature.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4692
    
    ### How should this be tested?
    * Unit test is added, also manually tested it
    
    ### Screenshots (if appropriate)
    
    ![image](https://user-images.githubusercontent.com/164491/77384871-78249300-6dc1-11ea-9cdd-98d17a2ebbf6.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? NO
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3696 from zjffdu/ZEPPELIN-4692 and squashes the following commits:
    
    e1a5ead7e [Jeff Zhang] [ZEPPELIN-4692]. zeppelin pyspark doesn't print java output
---
 .../zeppelin/python/IPythonInterpreterTest.java    | 32 +++++++++++----
 .../apache/zeppelin/spark/IPySparkInterpreter.java | 48 ++++++++++++++--------
 .../apache/zeppelin/spark/PySparkInterpreter.java  | 16 +++++++-
 .../zeppelin/spark/IPySparkInterpreterTest.java    |  8 ++++
 4 files changed, 76 insertions(+), 28 deletions(-)

diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index 0f302e9..cabcabc 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -296,15 +296,31 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
         "df = pd.DataFrame(np.random.randn(1000, 4), index=idx, columns=list('ABCD')).cumsum()\n" +
         "import hvplot.pandas\n" +
         "df.hvplot()", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(context.out.toInterpreterResultMessage().get(0).getData(),
+            InterpreterResult.Code.SUCCESS, result.code());
     interpreterResultMessages = context.out.toInterpreterResultMessage();
-    assertEquals(4, interpreterResultMessages.size());
-    assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
-    assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
-    assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
-    assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
-    // docs_json is the source data of plotting which bokeh would use to render the plotting.
-    assertTrue(interpreterResultMessages.get(3).getData().contains("docs_json"));
+
+    if (isPython2) {
+      // python 2 will have one extra output
+      // %text /home/travis/miniconda/lib/python2.7/site-packages/param/parameterized.py:2812: 
+      // UserWarning: Config option `use_jedi` not recognized by `IPCompleter`.
+      // return inst.__call__(*args,**params)
+      assertEquals(5, interpreterResultMessages.size());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(4).getType());
+      // docs_json is the source data of plotting which bokeh would use to render the plotting.
+      assertTrue(interpreterResultMessages.get(4).getData().contains("docs_json"));
+    } else {
+      assertEquals(4, interpreterResultMessages.size());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
+      // docs_json is the source data of plotting which bokeh would use to render the plotting.
+      assertTrue(interpreterResultMessages.get(3).getData().contains("docs_json"));
+    }
   }
 
 
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index 6436d92..b1e1baf 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -23,11 +23,13 @@ import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
 import org.apache.zeppelin.python.IPythonInterpreter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.Map;
 import java.util.Properties;
 
@@ -91,25 +93,35 @@ public class IPySparkInterpreter extends IPythonInterpreter {
   @Override
   public InterpreterResult interpret(String st,
                                      InterpreterContext context) throws InterpreterException {
-    Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
-    InterpreterContext.set(context);
-    String jobGroupId = Utils.buildJobGroupId(context);
-    String jobDesc = Utils.buildJobDesc(context);
-    String setJobGroupStmt = "sc.setJobGroup('" +  jobGroupId + "', '" + jobDesc + "')";
-    InterpreterResult result = super.interpret(setJobGroupStmt, context);
-    if (result.code().equals(InterpreterResult.Code.ERROR)) {
-      return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
+    // redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
+    PrintStream originalStdout = System.out;
+    PrintStream originalStderr = System.err;
+    try {
+      System.setOut(new PrintStream(context.out));
+      System.setErr(new PrintStream(context.out));
+      Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
+      InterpreterContext.set(context);
+      String jobGroupId = Utils.buildJobGroupId(context);
+      String jobDesc = Utils.buildJobDesc(context);
+      String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
+      InterpreterResult result = super.interpret(setJobGroupStmt, context);
+      if (result.code().equals(InterpreterResult.Code.ERROR)) {
+        return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
+      }
+      String pool = "None";
+      if (context.getLocalProperties().containsKey("pool")) {
+        pool = "'" + context.getLocalProperties().get("pool") + "'";
+      }
+      String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
+      result = super.interpret(setPoolStmt, context);
+      if (result.code().equals(InterpreterResult.Code.ERROR)) {
+        return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
+      }
+      return super.interpret(st, context);
+    } finally {
+      System.setOut(originalStdout);
+      System.setErr(originalStderr);
     }
-    String pool = "None";
-    if (context.getLocalProperties().containsKey("pool")) {
-      pool = "'" + context.getLocalProperties().get("pool") + "'";
-    }
-    String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
-    result = super.interpret(setPoolStmt, context);
-    if (result.code().equals(InterpreterResult.Code.ERROR)) {
-      return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
-    }
-    return super.interpret(st, context);
   }
 
   @Override
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 94073e0..f180799 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -25,6 +25,7 @@ import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
 import org.apache.zeppelin.python.IPythonInterpreter;
 import org.apache.zeppelin.python.PythonInterpreter;
 import org.slf4j.Logger;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -125,8 +127,18 @@ public class PySparkInterpreter extends PythonInterpreter {
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context)
       throws InterpreterException {
-    Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
-    return super.interpret(st, context);
+    // redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
+    PrintStream originalStdout = System.out;
+    PrintStream originalStderr = System.err;
+    try {
+      System.setOut(new PrintStream(context.out));
+      System.setErr(new PrintStream(context.out));
+      Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
+      return super.interpret(st, context);
+    } finally {
+      System.setOut(originalStdout);
+      System.setErr(originalStderr);
+    }
   }
 
   @Override
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index c64ba71..40ab851 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -217,6 +217,14 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
     assertTrue(completions.size() > 0);
     completions.contains(new InterpreterCompletion("sc", "sc", ""));
 
+    // python call java via py4j
+    context = createInterpreterContext(mockIntpEventClient);
+    result = interpreter.interpret("sc._jvm.java.lang.System.out.println(\"hello world\")", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals("hello world\n", interpreterResultMessages.get(0).getData());
+
     // pyspark streaming TODO(zjffdu) disable pyspark streaming test temporary
     context = createInterpreterContext(mockIntpEventClient);
     //    result = interpreter.interpret(