You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/08/11 04:08:58 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5479] %python.sql doesn't work with ipython interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 2a84f44   [ZEPPELIN-5479] %python.sql doesn't work with ipython interpreter
2a84f44 is described below

commit 2a84f449750b31cfa4448c5c58ecbaad8edb2c93
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Aug 7 20:53:15 2021 +0800

     [ZEPPELIN-5479] %python.sql doesn't work with ipython interpreter
    
    ### What is this PR for?
    
    This PR make %python.sql work with both vanilla python interpreter and ipython interpreter. It depends on whether ipython prerequisites are met.
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5479
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/128589268-c016ebe1-a484-4f04-846b-df8a75e3f31f.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4198 from zjffdu/ZEPPELIN-5479 and squashes the following commits:
    
    df963616a3 [Jeff Zhang] fix unit test
    46c4f08059 [Jeff Zhang] trim sql
    183660aeb6 [Jeff Zhang] fix code style
    7a3e54e539 [Jeff Zhang] fix code style
    c9e63fa4e0 [Jeff Zhang] [ZEPPELIN-5479] %python.sql doesn't work with ipython interpreter
    
    (cherry picked from commit a8021f0e72cfa375eaf3e2311d84e09353fe8f73)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../apache/zeppelin/python/PythonInterpreter.java  |  22 ++-
 .../python/PythonInterpreterPandasSql.java         |  29 ++-
 .../python/PythonInterpreterPandasSqlTest.java     | 197 +++++++++++++--------
 3 files changed, 155 insertions(+), 93 deletions(-)

diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index 23c0945..147ee49 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -563,13 +563,23 @@ public class PythonInterpreter extends Interpreter {
     String bootstrapCode =
         IOUtils.toString(getClass().getClassLoader().getResourceAsStream(resourceName));
     try {
-      // Add hook explicitly, otherwise python will fail to execute the statement
-      InterpreterResult result = interpret(bootstrapCode + "\n" + "__zeppelin__._displayhook()",
-          InterpreterContext.get());
-      if (result.code() != Code.SUCCESS) {
-        throw new IOException("Fail to run bootstrap script: " + resourceName + "\n" + result);
+      if (iPythonInterpreter != null) {
+        InterpreterResult result = iPythonInterpreter.interpret(bootstrapCode,
+                InterpreterContext.get());
+        if (result.code() != Code.SUCCESS) {
+          throw new IOException("Fail to run bootstrap script: " + resourceName + "\n" + result);
+        } else {
+          LOGGER.debug("Bootstrap python successfully.");
+        }
       } else {
-        LOGGER.debug("Bootstrap python successfully.");
+        // Add hook explicitly, otherwise python will fail to execute the statement
+        InterpreterResult result = interpret(bootstrapCode + "\n" + "__zeppelin__._displayhook()",
+                InterpreterContext.get());
+        if (result.code() != Code.SUCCESS) {
+          throw new IOException("Fail to run bootstrap script: " + resourceName + "\n" + result);
+        } else {
+          LOGGER.debug("Bootstrap python successfully.");
+        }
       }
     } catch (InterpreterException e) {
       throw new IOException(e);
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
index 4fccc3c..6f7cdb0 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
@@ -30,12 +30,11 @@ import java.util.Properties;
 /**
  * SQL over Pandas DataFrame interpreter for %python group
  * <p>
- * Match experience of %sparpk.sql over Spark DataFrame
+ * Match experience of %spark.sql over Spark DataFrame
  */
 public class PythonInterpreterPandasSql extends Interpreter {
-  private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterPandasSql.class);
-
-  private String SQL_BOOTSTRAP_FILE_PY = "python/bootstrap_sql.py";
+  private static final Logger LOGGER = LoggerFactory.getLogger(PythonInterpreterPandasSql.class);
+  private static String SQL_BOOTSTRAP_FILE_PY = "python/bootstrap_sql.py";
 
   private PythonInterpreter pythonInterpreter;
 
@@ -45,20 +44,20 @@ public class PythonInterpreterPandasSql extends Interpreter {
 
   @Override
   public void open() throws InterpreterException {
-    LOG.info("Open Python SQL interpreter instance: {}", this.toString());
-
+    LOGGER.info("Open Python SQL interpreter instance: PythonInterpreterPandasSql");
     try {
-      LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY);
+      LOGGER.info("Bootstrap PythonInterpreterPandasSql interpreter with {}",
+              SQL_BOOTSTRAP_FILE_PY);
       this.pythonInterpreter = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class);
       this.pythonInterpreter.bootstrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
     } catch (IOException e) {
-      LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
+      LOGGER.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
     }
   }
 
   @Override
   public void close() throws InterpreterException {
-    LOG.info("Close Python SQL interpreter instance: {}", this.toString());
+    LOGGER.info("Close Python SQL interpreter instance: {}", this.toString());
     if (pythonInterpreter != null) {
       pythonInterpreter.close();
     }
@@ -67,14 +66,14 @@ public class PythonInterpreterPandasSql extends Interpreter {
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context)
       throws InterpreterException {
-    LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
+    LOGGER.info("Running SQL query: '{}' over Pandas DataFrame", st);
     return pythonInterpreter.interpret(
-        "__zeppelin__.show(pysqldf('" + st + "'))\n__zeppelin__._displayhook()", context);
+        "z.show(pysqldf('" + st.trim() + "'))", context);
   }
 
   @Override
-  public void cancel(InterpreterContext context) {
-
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    pythonInterpreter.cancel(context);
   }
 
   @Override
@@ -83,8 +82,8 @@ public class PythonInterpreterPandasSql extends Interpreter {
   }
 
   @Override
-  public int getProgress(InterpreterContext context) {
-    return 0;
+  public int getProgress(InterpreterContext context) throws InterpreterException {
+    return pythonInterpreter.getProgress(context);
   }
 
 }
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
index beae44a..b8715d8 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
@@ -17,25 +17,32 @@
 
 package org.apache.zeppelin.python;
 
+import com.google.common.collect.Lists;
+import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
-import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 /**
  * In order for this test to work, test env must have installed:
@@ -51,145 +58,191 @@ import static org.junit.Assert.assertTrue;
  * mvn -Dpython.test.exclude='' test -pl python -am
  * </code>
  */
-public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener {
+@RunWith(value = Parameterized.class)
+public class PythonInterpreterPandasSqlTest {
+
+  private static final Logger LOGGER =
+          LoggerFactory.getLogger(PythonInterpreterPandasSqlTest.class);
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+            {true},
+            {false}
+    });
+  }
 
+  private boolean useIPython;
   private InterpreterGroup intpGroup;
-  private PythonInterpreterPandasSql sql;
-  private PythonInterpreter python;
+  private PythonInterpreterPandasSql pandasSqlInterpreter;
+  private PythonInterpreter pythonInterpreter;
+  private IPythonInterpreter ipythonInterpreter;
 
   private InterpreterContext context;
-  InterpreterOutput out;
+
+  public PythonInterpreterPandasSqlTest(boolean useIPython) {
+    this.useIPython = useIPython;
+    LOGGER.info("Test PythonInterpreterPandasSqlTest while useIPython={}", useIPython);
+  }
 
   @Before
   public void setUp() throws Exception {
     Properties p = new Properties();
     p.setProperty("zeppelin.python", "python");
     p.setProperty("zeppelin.python.maxResult", "100");
-    p.setProperty("zeppelin.python.useIPython", "false");
+    p.setProperty("zeppelin.python.useIPython", useIPython + "");
     p.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
 
     intpGroup = new InterpreterGroup();
 
-    out = new InterpreterOutput(this);
-    context = InterpreterContext.builder()
-        .setInterpreterOut(out)
-        .build();
+    context = getInterpreterContext();
     InterpreterContext.set(context);
 
-    python = new PythonInterpreter(p);
-    python.setInterpreterGroup(intpGroup);
-    python.open();
+    pythonInterpreter = new PythonInterpreter(p);
+    ipythonInterpreter = new IPythonInterpreter(p);
+    pandasSqlInterpreter = new PythonInterpreterPandasSql(p);
+
+    pythonInterpreter.setInterpreterGroup(intpGroup);
+    ipythonInterpreter.setInterpreterGroup(intpGroup);
+    pandasSqlInterpreter.setInterpreterGroup(intpGroup);
 
-    sql = new PythonInterpreterPandasSql(p);
-    sql.setInterpreterGroup(intpGroup);
+    List<Interpreter> interpreters =
+            Lists.newArrayList(pythonInterpreter, ipythonInterpreter, pandasSqlInterpreter);
 
-    intpGroup.put("note", Arrays.asList(python, sql));
+    intpGroup.put("session_1", interpreters);
 
+    pythonInterpreter.open();
 
     // to make sure python is running.
-    InterpreterResult ret = python.interpret("print(\"python initialized\")\n", context);
+    InterpreterResult ret = pythonInterpreter.interpret("print(\"python initialized\")\n", context);
     assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
-
-    sql.open();
+    pandasSqlInterpreter.open();
   }
 
   @After
-  public void afterTest() throws IOException, InterpreterException {
-    sql.close();
+  public void afterTest() throws InterpreterException {
+    pandasSqlInterpreter.close();
   }
 
   @Test
   public void dependenciesAreInstalled() throws InterpreterException {
     InterpreterResult ret =
-        python.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
+        pythonInterpreter.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
     assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
   }
 
   @Test
   public void errorMessageIfDependenciesNotInstalled() throws InterpreterException {
-    InterpreterResult ret;
-    ret = sql.interpret("SELECT * from something", context);
+    context = getInterpreterContext();
+    InterpreterResult ret = pandasSqlInterpreter.interpret("SELECT * from something", context);
 
     assertNotNull(ret);
-    assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.ERROR, ret.code());
-    assertTrue(ret.message().get(0).getData().contains("no such table: something"));
+    assertEquals(context.out.toString(), InterpreterResult.Code.ERROR, ret.code());
+    if (useIPython) {
+      assertTrue(context.out.toString(),
+              context.out.toString().contains("no such table: something"));
+    } else {
+      assertTrue(ret.toString(), ret.toString().contains("no such table: something"));
+    }
   }
 
   @Test
   public void sqlOverTestDataPrintsTable() throws IOException, InterpreterException {
-    InterpreterResult ret;
-    // given
-    //String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34";
-    ret = python.interpret("import pandas as pd", context);
-    ret = python.interpret("import numpy as np", context);
+    InterpreterResult ret = pythonInterpreter.interpret("import pandas as pd\nimport numpy as np", context);
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+
     // DataFrame df2 \w test data
-    ret = python.interpret("df2 = pd.DataFrame({ 'age'  : np.array([33, 51, 51, 34]), " +
+    ret = pythonInterpreter.interpret("df2 = pd.DataFrame({ 'age'  : np.array([33, 51, 51, 34]), " +
         "'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
     assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
 
     //when
-    ret = sql.interpret("select name, age from df2 where age < 40", context);
+    context = getInterpreterContext();
+    ret = pandasSqlInterpreter.interpret("select name, age from df2 where age < 40", context);
 
     //then
-    assertEquals(new String(out.getOutputAt(1).toByteArray()),
-        InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(new String(out.getOutputAt(1).toByteArray()), Type.TABLE,
-        out.getOutputAt(1).getType());
-    assertTrue(new String(out.getOutputAt(1).toByteArray()).indexOf("moon\t33") > 0);
-    assertTrue(new String(out.getOutputAt(1).toByteArray()).indexOf("park\t34") > 0);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(context.out.toString(), Type.TABLE,
+            context.out.toInterpreterResultMessage().get(0).getType());
+    assertTrue(context.out.toString().indexOf("moon\t33") > 0);
+    assertTrue(context.out.toString().indexOf("park\t34") > 0);
 
     assertEquals(InterpreterResult.Code.SUCCESS,
-        sql.interpret("select case when name==\"aa\" then name else name end from df2",
-            context).code());
+        pandasSqlInterpreter.interpret(
+                "select case when name==\"aa\" then name else name end from df2",
+                context).code());
   }
 
   @Test
-  public void badSqlSyntaxFails() throws IOException, InterpreterException {
+  public void testInIPython() throws IOException, InterpreterException {
+    InterpreterResult ret =
+            pythonInterpreter.interpret("import pandas as pd\nimport numpy as np", context);
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+    // DataFrame df2 \w test data
+    ret = pythonInterpreter.interpret("df2 = pd.DataFrame({ 'age'  : np.array([33, 51, 51, 34]), " +
+            "'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
+    assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+
     //when
-    InterpreterResult ret = sql.interpret("select wrong syntax", context);
+    ret = pandasSqlInterpreter.interpret("select name, age from df2 where age < 40", context);
+
+    //then
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(context.out.toString(), Type.TABLE,
+            context.out.toInterpreterResultMessage().get(1).getType());
+    assertTrue(context.out.toString().indexOf("moon\t33") > 0);
+    assertTrue(context.out.toString().indexOf("park\t34") > 0);
+
+    assertEquals(InterpreterResult.Code.SUCCESS,
+            pandasSqlInterpreter.interpret(
+                    "select case when name==\"aa\" then name else name end from df2",
+                    context).code());
+  }
+
+  @Test
+  public void badSqlSyntaxFails() throws InterpreterException {
+    //when
+    context = getInterpreterContext();
+    InterpreterResult ret = pandasSqlInterpreter.interpret("select wrong syntax", context);
 
     //then
     assertNotNull("Interpreter returned 'null'", ret);
-    assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
+    assertEquals(context.out.toString(), InterpreterResult.Code.ERROR, ret.code());
   }
 
   @Test
   public void showDataFrame() throws IOException, InterpreterException {
-    InterpreterResult ret;
-    ret = python.interpret("import pandas as pd", context);
-    ret = python.interpret("import numpy as np", context);
+    pythonInterpreter.interpret("import pandas as pd", context);
+    pythonInterpreter.interpret("import numpy as np", context);
 
     // given a Pandas DataFrame with an index and non-text data
-    ret = python.interpret("index = pd.Index([10, 11, 12, 13], name='index_name')", context);
-    ret = python.interpret("d1 = {1 : [np.nan, 1, 2, 3], 'two' : [3., 4., 5., 6.7]}", context);
-    ret = python.interpret("df1 = pd.DataFrame(d1, index=index)", context);
+    pythonInterpreter.interpret(
+            "index = pd.Index([10, 11, 12, 13], name='index_name')", context);
+    pythonInterpreter.interpret(
+            "d1 = {1 : [np.nan, 1, 2, 3], 'two' : [3., 4., 5., 6.7]}", context);
+    InterpreterResult ret = pythonInterpreter.interpret(
+            "df1 = pd.DataFrame(d1, index=index)", context);
     assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
 
     // when
-    ret = python.interpret("z.show(df1, show_index=True)", context);
+    context = getInterpreterContext();
+    ret = pythonInterpreter.interpret("z.show(df1, show_index=True)", context);
 
     // then
-    assertEquals(new String(out.getOutputAt(0).toByteArray()),
-        InterpreterResult.Code.SUCCESS, ret.code());
-    assertEquals(new String(out.getOutputAt(1).toByteArray()),
-        Type.TABLE, out.getOutputAt(1).getType());
-    assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("index_name"));
-    assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("nan"));
-    assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("6.7"));
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, ret.code());
+    assertEquals(context.out.toString(), Type.TABLE,
+            context.out.toInterpreterResultMessage().get(0).getType());
+    assertTrue(context.out.toString().contains("index_name"));
+    assertTrue(context.out.toString().contains("nan"));
+    assertTrue(context.out.toString().contains("6.7"));
   }
 
-  @Override
-  public void onUpdateAll(InterpreterOutput out) {
-
-  }
-
-  @Override
-  public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
-
-  }
-
-  @Override
-  public void onUpdate(int index, InterpreterResultMessageOutput out) {
-
+  private InterpreterContext getInterpreterContext() {
+    return InterpreterContext.builder()
+            .setNoteId("noteId")
+            .setParagraphId("paragraphId")
+            .setInterpreterOut(new InterpreterOutput())
+            .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+            .build();
   }
 }