You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/03/30 03:53:35 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4692]. zeppelin pyspark doesn't print java output

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 165e3a2  [ZEPPELIN-4692]. zeppelin pyspark doesn't print java output
165e3a2 is described below

commit 165e3a23ddf407cd61764c62f9361a2cb3bd6b4e
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Mar 24 11:09:09 2020 +0800

    [ZEPPELIN-4692]. zeppelin pyspark doesn't print java output
    
    ### What is this PR for?
    
    The root cause of this issue is that we didn't redirect java output to interpreter output. This PR fix it via redirect java output before interpreting python code in both PySparkInterpreter & IPySparkInterpreter. Unit test is also added to verify this feature.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4692
    
    ### How should this be tested?
    * Unit test is added, also manually tested it
    
    ### Screenshots (if appropriate)
    
    ![image](https://user-images.githubusercontent.com/164491/77384871-78249300-6dc1-11ea-9cdd-98d17a2ebbf6.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? NO
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3696 from zjffdu/ZEPPELIN-4692 and squashes the following commits:
    
    e1a5ead7e [Jeff Zhang] [ZEPPELIN-4692]. zeppelin pyspark doesn't print java output
    
    (cherry picked from commit 6cdcc5b01250afab52a766ede37c469686f9d492)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../zeppelin/python/IPythonInterpreterTest.java    | 32 +++++++++++----
 .../apache/zeppelin/spark/IPySparkInterpreter.java | 48 ++++++++++++++--------
 .../apache/zeppelin/spark/PySparkInterpreter.java  | 16 +++++++-
 .../zeppelin/spark/IPySparkInterpreterTest.java    |  8 ++++
 4 files changed, 76 insertions(+), 28 deletions(-)

diff --git a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
index 0f302e9..cabcabc 100644
--- a/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/IPythonInterpreterTest.java
@@ -296,15 +296,31 @@ public class IPythonInterpreterTest extends BasePythonInterpreterTest {
         "df = pd.DataFrame(np.random.randn(1000, 4), index=idx, columns=list('ABCD')).cumsum()\n" +
         "import hvplot.pandas\n" +
         "df.hvplot()", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(context.out.toInterpreterResultMessage().get(0).getData(),
+            InterpreterResult.Code.SUCCESS, result.code());
     interpreterResultMessages = context.out.toInterpreterResultMessage();
-    assertEquals(4, interpreterResultMessages.size());
-    assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
-    assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
-    assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
-    assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
-    // docs_json is the source data of plotting which bokeh would use to render the plotting.
-    assertTrue(interpreterResultMessages.get(3).getData().contains("docs_json"));
+
+    if (isPython2) {
+      // python 2 will have one extra output
+      // %text /home/travis/miniconda/lib/python2.7/site-packages/param/parameterized.py:2812: 
+      // UserWarning: Config option `use_jedi` not recognized by `IPCompleter`.
+      // return inst.__call__(*args,**params)
+      assertEquals(5, interpreterResultMessages.size());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(4).getType());
+      // docs_json is the source data of plotting which bokeh would use to render the plotting.
+      assertTrue(interpreterResultMessages.get(4).getData().contains("docs_json"));
+    } else {
+      assertEquals(4, interpreterResultMessages.size());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(0).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(1).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(2).getType());
+      assertEquals(InterpreterResult.Type.HTML, interpreterResultMessages.get(3).getType());
+      // docs_json is the source data of plotting which bokeh would use to render the plotting.
+      assertTrue(interpreterResultMessages.get(3).getData().contains("docs_json"));
+    }
   }
 
 
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index 6436d92..b1e1baf 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -23,11 +23,13 @@ import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
 import org.apache.zeppelin.python.IPythonInterpreter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.PrintStream;
 import java.util.Map;
 import java.util.Properties;
 
@@ -91,25 +93,35 @@ public class IPySparkInterpreter extends IPythonInterpreter {
   @Override
   public InterpreterResult interpret(String st,
                                      InterpreterContext context) throws InterpreterException {
-    Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
-    InterpreterContext.set(context);
-    String jobGroupId = Utils.buildJobGroupId(context);
-    String jobDesc = Utils.buildJobDesc(context);
-    String setJobGroupStmt = "sc.setJobGroup('" +  jobGroupId + "', '" + jobDesc + "')";
-    InterpreterResult result = super.interpret(setJobGroupStmt, context);
-    if (result.code().equals(InterpreterResult.Code.ERROR)) {
-      return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
+    // redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
+    PrintStream originalStdout = System.out;
+    PrintStream originalStderr = System.err;
+    try {
+      System.setOut(new PrintStream(context.out));
+      System.setErr(new PrintStream(context.out));
+      Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
+      InterpreterContext.set(context);
+      String jobGroupId = Utils.buildJobGroupId(context);
+      String jobDesc = Utils.buildJobDesc(context);
+      String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')";
+      InterpreterResult result = super.interpret(setJobGroupStmt, context);
+      if (result.code().equals(InterpreterResult.Code.ERROR)) {
+        return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setJobGroup");
+      }
+      String pool = "None";
+      if (context.getLocalProperties().containsKey("pool")) {
+        pool = "'" + context.getLocalProperties().get("pool") + "'";
+      }
+      String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
+      result = super.interpret(setPoolStmt, context);
+      if (result.code().equals(InterpreterResult.Code.ERROR)) {
+        return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
+      }
+      return super.interpret(st, context);
+    } finally {
+      System.setOut(originalStdout);
+      System.setErr(originalStderr);
     }
-    String pool = "None";
-    if (context.getLocalProperties().containsKey("pool")) {
-      pool = "'" + context.getLocalProperties().get("pool") + "'";
-    }
-    String setPoolStmt = "sc.setLocalProperty('spark.scheduler.pool', " + pool + ")";
-    result = super.interpret(setPoolStmt, context);
-    if (result.code().equals(InterpreterResult.Code.ERROR)) {
-      return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to setPool");
-    }
-    return super.interpret(st, context);
   }
 
   @Override
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 94073e0..f180799 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -25,6 +25,7 @@ import org.apache.zeppelin.interpreter.ZeppelinContext;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
 import org.apache.zeppelin.python.IPythonInterpreter;
 import org.apache.zeppelin.python.PythonInterpreter;
 import org.slf4j.Logger;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintStream;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -125,8 +127,18 @@ public class PySparkInterpreter extends PythonInterpreter {
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context)
       throws InterpreterException {
-    Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
-    return super.interpret(st, context);
+    // redirect java stdout/stdout to interpreter output. Because pyspark may call java code.
+    PrintStream originalStdout = System.out;
+    PrintStream originalStderr = System.err;
+    try {
+      System.setOut(new PrintStream(context.out));
+      System.setErr(new PrintStream(context.out));
+      Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(), context, properties);
+      return super.interpret(st, context);
+    } finally {
+      System.setOut(originalStdout);
+      System.setErr(originalStderr);
+    }
   }
 
   @Override
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index c64ba71..40ab851 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -217,6 +217,14 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest {
     assertTrue(completions.size() > 0);
     completions.contains(new InterpreterCompletion("sc", "sc", ""));
 
+    // python call java via py4j
+    context = createInterpreterContext(mockIntpEventClient);
+    result = interpreter.interpret("sc._jvm.java.lang.System.out.println(\"hello world\")", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    interpreterResultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(1, interpreterResultMessages.size());
+    assertEquals("hello world\n", interpreterResultMessages.get(0).getData());
+
     // pyspark streaming TODO(zjffdu) disable pyspark streaming test temporary
     context = createInterpreterContext(mockIntpEventClient);
     //    result = interpreter.interpret(