You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/08/11 04:08:58 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5479] %python.sql
doesn't work with ipython interpreter
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 2a84f44 [ZEPPELIN-5479] %python.sql doesn't work with ipython interpreter
2a84f44 is described below
commit 2a84f449750b31cfa4448c5c58ecbaad8edb2c93
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Aug 7 20:53:15 2021 +0800
[ZEPPELIN-5479] %python.sql doesn't work with ipython interpreter
### What is this PR for?
This PR make %python.sql work with both vanilla python interpreter and ipython interpreter. It depends on whether ipython prerequisites are met.
### What type of PR is it?
[ Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5479
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
![image](https://user-images.githubusercontent.com/164491/128589268-c016ebe1-a484-4f04-846b-df8a75e3f31f.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4198 from zjffdu/ZEPPELIN-5479 and squashes the following commits:
df963616a3 [Jeff Zhang] fix unit test
46c4f08059 [Jeff Zhang] trim sql
183660aeb6 [Jeff Zhang] fix code style
7a3e54e539 [Jeff Zhang] fix code style
c9e63fa4e0 [Jeff Zhang] [ZEPPELIN-5479] %python.sql doesn't work with ipython interpreter
(cherry picked from commit a8021f0e72cfa375eaf3e2311d84e09353fe8f73)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../apache/zeppelin/python/PythonInterpreter.java | 22 ++-
.../python/PythonInterpreterPandasSql.java | 29 ++-
.../python/PythonInterpreterPandasSqlTest.java | 197 +++++++++++++--------
3 files changed, 155 insertions(+), 93 deletions(-)
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index 23c0945..147ee49 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -563,13 +563,23 @@ public class PythonInterpreter extends Interpreter {
String bootstrapCode =
IOUtils.toString(getClass().getClassLoader().getResourceAsStream(resourceName));
try {
- // Add hook explicitly, otherwise python will fail to execute the statement
- InterpreterResult result = interpret(bootstrapCode + "\n" + "__zeppelin__._displayhook()",
- InterpreterContext.get());
- if (result.code() != Code.SUCCESS) {
- throw new IOException("Fail to run bootstrap script: " + resourceName + "\n" + result);
+ if (iPythonInterpreter != null) {
+ InterpreterResult result = iPythonInterpreter.interpret(bootstrapCode,
+ InterpreterContext.get());
+ if (result.code() != Code.SUCCESS) {
+ throw new IOException("Fail to run bootstrap script: " + resourceName + "\n" + result);
+ } else {
+ LOGGER.debug("Bootstrap python successfully.");
+ }
} else {
- LOGGER.debug("Bootstrap python successfully.");
+ // Add hook explicitly, otherwise python will fail to execute the statement
+ InterpreterResult result = interpret(bootstrapCode + "\n" + "__zeppelin__._displayhook()",
+ InterpreterContext.get());
+ if (result.code() != Code.SUCCESS) {
+ throw new IOException("Fail to run bootstrap script: " + resourceName + "\n" + result);
+ } else {
+ LOGGER.debug("Bootstrap python successfully.");
+ }
}
} catch (InterpreterException e) {
throw new IOException(e);
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
index 4fccc3c..6f7cdb0 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
@@ -30,12 +30,11 @@ import java.util.Properties;
/**
* SQL over Pandas DataFrame interpreter for %python group
* <p>
- * Match experience of %sparpk.sql over Spark DataFrame
+ * Match experience of %spark.sql over Spark DataFrame
*/
public class PythonInterpreterPandasSql extends Interpreter {
- private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterPandasSql.class);
-
- private String SQL_BOOTSTRAP_FILE_PY = "python/bootstrap_sql.py";
+ private static final Logger LOGGER = LoggerFactory.getLogger(PythonInterpreterPandasSql.class);
+ private static String SQL_BOOTSTRAP_FILE_PY = "python/bootstrap_sql.py";
private PythonInterpreter pythonInterpreter;
@@ -45,20 +44,20 @@ public class PythonInterpreterPandasSql extends Interpreter {
@Override
public void open() throws InterpreterException {
- LOG.info("Open Python SQL interpreter instance: {}", this.toString());
-
+ LOGGER.info("Open Python SQL interpreter instance: PythonInterpreterPandasSql");
try {
- LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY);
+ LOGGER.info("Bootstrap PythonInterpreterPandasSql interpreter with {}",
+ SQL_BOOTSTRAP_FILE_PY);
this.pythonInterpreter = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class);
this.pythonInterpreter.bootstrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
} catch (IOException e) {
- LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
+ LOGGER.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
}
}
@Override
public void close() throws InterpreterException {
- LOG.info("Close Python SQL interpreter instance: {}", this.toString());
+ LOGGER.info("Close Python SQL interpreter instance: {}", this.toString());
if (pythonInterpreter != null) {
pythonInterpreter.close();
}
@@ -67,14 +66,14 @@ public class PythonInterpreterPandasSql extends Interpreter {
@Override
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
- LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
+ LOGGER.info("Running SQL query: '{}' over Pandas DataFrame", st);
return pythonInterpreter.interpret(
- "__zeppelin__.show(pysqldf('" + st + "'))\n__zeppelin__._displayhook()", context);
+ "z.show(pysqldf('" + st.trim() + "'))", context);
}
@Override
- public void cancel(InterpreterContext context) {
-
+ public void cancel(InterpreterContext context) throws InterpreterException {
+ pythonInterpreter.cancel(context);
}
@Override
@@ -83,8 +82,8 @@ public class PythonInterpreterPandasSql extends Interpreter {
}
@Override
- public int getProgress(InterpreterContext context) {
- return 0;
+ public int getProgress(InterpreterContext context) throws InterpreterException {
+ return pythonInterpreter.getProgress(context);
}
}
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
index beae44a..b8715d8 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
@@ -17,25 +17,32 @@
package org.apache.zeppelin.python;
+import com.google.common.collect.Lists;
+import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
-import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
/**
* In order for this test to work, test env must have installed:
@@ -51,145 +58,191 @@ import static org.junit.Assert.assertTrue;
* mvn -Dpython.test.exclude='' test -pl python -am
* </code>
*/
-public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener {
+@RunWith(value = Parameterized.class)
+public class PythonInterpreterPandasSqlTest {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PythonInterpreterPandasSqlTest.class);
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {true},
+ {false}
+ });
+ }
+ private boolean useIPython;
private InterpreterGroup intpGroup;
- private PythonInterpreterPandasSql sql;
- private PythonInterpreter python;
+ private PythonInterpreterPandasSql pandasSqlInterpreter;
+ private PythonInterpreter pythonInterpreter;
+ private IPythonInterpreter ipythonInterpreter;
private InterpreterContext context;
- InterpreterOutput out;
+
+ public PythonInterpreterPandasSqlTest(boolean useIPython) {
+ this.useIPython = useIPython;
+ LOGGER.info("Test PythonInterpreterPandasSqlTest while useIPython={}", useIPython);
+ }
@Before
public void setUp() throws Exception {
Properties p = new Properties();
p.setProperty("zeppelin.python", "python");
p.setProperty("zeppelin.python.maxResult", "100");
- p.setProperty("zeppelin.python.useIPython", "false");
+ p.setProperty("zeppelin.python.useIPython", useIPython + "");
p.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
intpGroup = new InterpreterGroup();
- out = new InterpreterOutput(this);
- context = InterpreterContext.builder()
- .setInterpreterOut(out)
- .build();
+ context = getInterpreterContext();
InterpreterContext.set(context);
- python = new PythonInterpreter(p);
- python.setInterpreterGroup(intpGroup);
- python.open();
+ pythonInterpreter = new PythonInterpreter(p);
+ ipythonInterpreter = new IPythonInterpreter(p);
+ pandasSqlInterpreter = new PythonInterpreterPandasSql(p);
+
+ pythonInterpreter.setInterpreterGroup(intpGroup);
+ ipythonInterpreter.setInterpreterGroup(intpGroup);
+ pandasSqlInterpreter.setInterpreterGroup(intpGroup);
- sql = new PythonInterpreterPandasSql(p);
- sql.setInterpreterGroup(intpGroup);
+ List<Interpreter> interpreters =
+ Lists.newArrayList(pythonInterpreter, ipythonInterpreter, pandasSqlInterpreter);
- intpGroup.put("note", Arrays.asList(python, sql));
+ intpGroup.put("session_1", interpreters);
+ pythonInterpreter.open();
// to make sure python is running.
- InterpreterResult ret = python.interpret("print(\"python initialized\")\n", context);
+ InterpreterResult ret = pythonInterpreter.interpret("print(\"python initialized\")\n", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
-
- sql.open();
+ pandasSqlInterpreter.open();
}
@After
- public void afterTest() throws IOException, InterpreterException {
- sql.close();
+ public void afterTest() throws InterpreterException {
+ pandasSqlInterpreter.close();
}
@Test
public void dependenciesAreInstalled() throws InterpreterException {
InterpreterResult ret =
- python.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
+ pythonInterpreter.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
public void errorMessageIfDependenciesNotInstalled() throws InterpreterException {
- InterpreterResult ret;
- ret = sql.interpret("SELECT * from something", context);
+ context = getInterpreterContext();
+ InterpreterResult ret = pandasSqlInterpreter.interpret("SELECT * from something", context);
assertNotNull(ret);
- assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.ERROR, ret.code());
- assertTrue(ret.message().get(0).getData().contains("no such table: something"));
+ assertEquals(context.out.toString(), InterpreterResult.Code.ERROR, ret.code());
+ if (useIPython) {
+ assertTrue(context.out.toString(),
+ context.out.toString().contains("no such table: something"));
+ } else {
+ assertTrue(ret.toString(), ret.toString().contains("no such table: something"));
+ }
}
@Test
public void sqlOverTestDataPrintsTable() throws IOException, InterpreterException {
- InterpreterResult ret;
- // given
- //String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34";
- ret = python.interpret("import pandas as pd", context);
- ret = python.interpret("import numpy as np", context);
+ InterpreterResult ret = pythonInterpreter.interpret("import pandas as pd\nimport numpy as np", context);
+ assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+
// DataFrame df2 \w test data
- ret = python.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), " +
+ ret = pythonInterpreter.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), " +
"'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
//when
- ret = sql.interpret("select name, age from df2 where age < 40", context);
+ context = getInterpreterContext();
+ ret = pandasSqlInterpreter.interpret("select name, age from df2 where age < 40", context);
//then
- assertEquals(new String(out.getOutputAt(1).toByteArray()),
- InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(new String(out.getOutputAt(1).toByteArray()), Type.TABLE,
- out.getOutputAt(1).getType());
- assertTrue(new String(out.getOutputAt(1).toByteArray()).indexOf("moon\t33") > 0);
- assertTrue(new String(out.getOutputAt(1).toByteArray()).indexOf("park\t34") > 0);
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(context.out.toString(), Type.TABLE,
+ context.out.toInterpreterResultMessage().get(0).getType());
+ assertTrue(context.out.toString().indexOf("moon\t33") > 0);
+ assertTrue(context.out.toString().indexOf("park\t34") > 0);
assertEquals(InterpreterResult.Code.SUCCESS,
- sql.interpret("select case when name==\"aa\" then name else name end from df2",
- context).code());
+ pandasSqlInterpreter.interpret(
+ "select case when name==\"aa\" then name else name end from df2",
+ context).code());
}
@Test
- public void badSqlSyntaxFails() throws IOException, InterpreterException {
+ public void testInIPython() throws IOException, InterpreterException {
+ InterpreterResult ret =
+ pythonInterpreter.interpret("import pandas as pd\nimport numpy as np", context);
+ assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+ // DataFrame df2 \w test data
+ ret = pythonInterpreter.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), " +
+ "'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
+ assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
+
//when
- InterpreterResult ret = sql.interpret("select wrong syntax", context);
+ ret = pandasSqlInterpreter.interpret("select name, age from df2 where age < 40", context);
+
+ //then
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(context.out.toString(), Type.TABLE,
+ context.out.toInterpreterResultMessage().get(1).getType());
+ assertTrue(context.out.toString().indexOf("moon\t33") > 0);
+ assertTrue(context.out.toString().indexOf("park\t34") > 0);
+
+ assertEquals(InterpreterResult.Code.SUCCESS,
+ pandasSqlInterpreter.interpret(
+ "select case when name==\"aa\" then name else name end from df2",
+ context).code());
+ }
+
+ @Test
+ public void badSqlSyntaxFails() throws InterpreterException {
+ //when
+ context = getInterpreterContext();
+ InterpreterResult ret = pandasSqlInterpreter.interpret("select wrong syntax", context);
//then
assertNotNull("Interpreter returned 'null'", ret);
- assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
+ assertEquals(context.out.toString(), InterpreterResult.Code.ERROR, ret.code());
}
@Test
public void showDataFrame() throws IOException, InterpreterException {
- InterpreterResult ret;
- ret = python.interpret("import pandas as pd", context);
- ret = python.interpret("import numpy as np", context);
+ pythonInterpreter.interpret("import pandas as pd", context);
+ pythonInterpreter.interpret("import numpy as np", context);
// given a Pandas DataFrame with an index and non-text data
- ret = python.interpret("index = pd.Index([10, 11, 12, 13], name='index_name')", context);
- ret = python.interpret("d1 = {1 : [np.nan, 1, 2, 3], 'two' : [3., 4., 5., 6.7]}", context);
- ret = python.interpret("df1 = pd.DataFrame(d1, index=index)", context);
+ pythonInterpreter.interpret(
+ "index = pd.Index([10, 11, 12, 13], name='index_name')", context);
+ pythonInterpreter.interpret(
+ "d1 = {1 : [np.nan, 1, 2, 3], 'two' : [3., 4., 5., 6.7]}", context);
+ InterpreterResult ret = pythonInterpreter.interpret(
+ "df1 = pd.DataFrame(d1, index=index)", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
// when
- ret = python.interpret("z.show(df1, show_index=True)", context);
+ context = getInterpreterContext();
+ ret = pythonInterpreter.interpret("z.show(df1, show_index=True)", context);
// then
- assertEquals(new String(out.getOutputAt(0).toByteArray()),
- InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(new String(out.getOutputAt(1).toByteArray()),
- Type.TABLE, out.getOutputAt(1).getType());
- assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("index_name"));
- assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("nan"));
- assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("6.7"));
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(context.out.toString(), Type.TABLE,
+ context.out.toInterpreterResultMessage().get(0).getType());
+ assertTrue(context.out.toString().contains("index_name"));
+ assertTrue(context.out.toString().contains("nan"));
+ assertTrue(context.out.toString().contains("6.7"));
}
- @Override
- public void onUpdateAll(InterpreterOutput out) {
-
- }
-
- @Override
- public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
-
- }
-
- @Override
- public void onUpdate(int index, InterpreterResultMessageOutput out) {
-
+ private InterpreterContext getInterpreterContext() {
+ return InterpreterContext.builder()
+ .setNoteId("noteId")
+ .setParagraphId("paragraphId")
+ .setInterpreterOut(new InterpreterOutput())
+ .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
+ .build();
}
}