You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2017/03/18 10:50:46 UTC
[2/2] zeppelin git commit: Rewrite PythonInterpreter.
Rewrite PythonInterpreter.
### What is this PR for?
I've been testing the python interpreter and I found at least 4 major issues in the current python interpreter.
1. not working streaming output.
- https://issues.apache.org/jira/browse/ZEPPELIN-2225
2. printed "..." when there is indent in the python code.
- https://issues.apache.org/jira/browse/ZEPPELIN-1929
3. very slow output of matplotlib
- https://issues.apache.org/jira/browse/ZEPPELIN-1894
- https://issues.apache.org/jira/browse/ZEPPELIN-1360
4. Unexpected output of matplotlib.
- https://issues.apache.org/jira/browse/ZEPPELIN-2107
so I changed python interpreter to use py4j based on pyspark interpreter and would be fixed above issues.
and I am going to recreate conda, docker for python interpreter ASAP.
### What type of PR is it?
Bug Fix | Hot Fix | Refactoring
### How should this be tested?
1. not working streaming output.
```
import time
for x in range(0, 5):
print x
time.sleep(1)
```
2. printed "..." when there is indent in the python code.
```
def fn():
print("hi")
fn()
```
3. very slow output of matplotlib.
```
import matplotlib
import sys
import matplotlib.pyplot as plt
plt.plot([1,2,3])
```
4. Unexpected output of matplotlib.
```
import matplotlib.pyplot as plt
import matplotlib as mpl
# Make a figure and axes with dimensions as desired.
fig = plt.figure(figsize=(8, 3))
ax1 = fig.add_axes([0.05, 0.80, 0.9, 0.15])
ax2 = fig.add_axes([0.05, 0.475, 0.9, 0.15])
ax3 = fig.add_axes([0.05, 0.15, 0.9, 0.15])
# Set the colormap and norm to correspond to the data for which
# the colorbar will be used.
cmap = mpl.cm.cool
norm = mpl.colors.Normalize(vmin=5, vmax=10)
# ColorbarBase derives from ScalarMappable and puts a colorbar
# in a specified axes, so it has everything needed for a
# standalone colorbar. There are many more kwargs, but the
# following gives a basic continuous colorbar with ticks
# and labels.
cb1 = mpl.colorbar.ColorbarBase(ax1, cmap=cmap,
norm=norm,
orientation='horizontal')
cb1.set_label('Some Units')
# The second example illustrates the use of a ListedColormap, a
# BoundaryNorm, and extended ends to show the "over" and "under"
# value colors.
cmap = mpl.colors.ListedColormap(['r', 'g', 'b', 'c'])
cmap.set_over('0.25')
cmap.set_under('0.75')
# If a ListedColormap is used, the length of the bounds array must be
# one greater than the length of the color list. The bounds must be
# monotonically increasing.
bounds = [1, 2, 4, 7, 8]
norm = mpl.colors.BoundaryNorm(bounds, cmap.N)
cb2 = mpl.colorbar.ColorbarBase(ax2, cmap=cmap,
norm=norm,
# to use 'extend', you must
# specify two extra boundaries:
boundaries=[0] + bounds + [13],
extend='both',
ticks=bounds, # optional
spacing='proportional',
orientation='horizontal')
cb2.set_label('Discrete intervals, some other units')
# The third example illustrates the use of custom length colorbar
# extensions, used on a colorbar with discrete intervals.
cmap = mpl.colors.ListedColormap([[0., .4, 1.], [0., .8, 1.],
[1., .8, 0.], [1., .4, 0.]])
cmap.set_over((1., 0., 0.))
cmap.set_under((0., 0., 1.))
bounds = [-1., -.5, 0., .5, 1.]
norm = mpl.colors.BoundaryNorm(bounds, cmap.N)
cb3 = mpl.colorbar.ColorbarBase(ax3, cmap=cmap,
norm=norm,
boundaries=[-10] + bounds + [10],
extend='both',
# Make the length of each extension
# the same as the length of the
# interior colors:
extendfrac='auto',
ticks=bounds,
spacing='uniform',
orientation='horizontal')
cb3.set_label('Custom extension lengths, some other units')
plt.show()
```
### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no
Author: astroshim <hs...@zepl.com>
Author: Lee moon soo <mo...@apache.org>
Author: HyungSung <hs...@nflabs.com>
Closes #2106 from astroshim/py4jPythonInterpreter and squashes the following commits:
c9b195b [HyungSung] Merge pull request #16 from Leemoonsoo/py4jdocker
e511ebe [Lee moon soo] add PythonDockerInterpreter to interpreter-setting.json
a76b0d8 [Lee moon soo] fix test on python3
2eb5de7 [Lee moon soo] Fix PythonDockerInterpreterTest.java test
9fcf144 [Lee moon soo] Make python docker interpreter work using py4j
8a016c9 [astroshim] Merge branch 'master' into py4jPythonInterpreter
aad7ee8 [astroshim] fix testcase
ac92cdb [astroshim] fix python interpreter testcase
e8570d2 [astroshim] fix ci for pandassql
be5db4d [astroshim] fix pandas sql testcase
f8e19be [astroshim] fix matplotlib testcase
046db88 [astroshim] add testcase
e49ad24 [astroshim] add pandas
60e9820 [astroshim] bug fix about copying library
574bd21 [astroshim] fix interpreter-setting error
a48df58 [astroshim] Merge branch 'master' into py4jPythonInterpreter
3c9585f [astroshim] update interpreter-setting.json
a50179e [astroshim] add conda interpreter
cbbc15c [astroshim] fix py4j path
5ae5120 [astroshim] fix interpreter-setting
f17bff4 [astroshim] fix testcase failure.
af097ac [astroshim] add testcase
c3f5b78 [astroshim] Merge branch 'master' of https://github.com/apache/zeppelin into py4jPythonInterpreter
1395875 [astroshim] removed unnecessary code.
276011e [astroshim] add py4j lib
7304919 [astroshim] initialize python interpreter using py4j
Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/287ffd50
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/287ffd50
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/287ffd50
Branch: refs/heads/master
Commit: 287ffd50e2f061d5fdbe42e37c8857a79420fa80
Parents: 1b5c3a3
Author: astroshim <hs...@zepl.com>
Authored: Sat Mar 18 18:24:16 2017 +0900
Committer: Jongyoul Lee <jo...@apache.org>
Committed: Sat Mar 18 19:50:42 2017 +0900
----------------------------------------------------------------------
python/pom.xml | 37 ++
.../python/PythonDockerInterpreter.java | 37 +-
.../zeppelin/python/PythonInterpreter.java | 465 ++++++++++++++-----
.../python/PythonInterpreterPandasSql.java | 17 +-
.../apache/zeppelin/python/PythonProcess.java | 138 ------
python/src/main/resources/__init__.py | 14 -
python/src/main/resources/bootstrap.py | 234 ----------
python/src/main/resources/bootstrap_input.py | 58 ---
python/src/main/resources/bootstrap_sql.py | 28 --
.../src/main/resources/interpreter-setting.json | 2 +-
.../src/main/resources/python/bootstrap_sql.py | 29 ++
.../main/resources/python/zeppelin_python.py | 276 +++++++++++
.../python/PythonCondaInterpreterTest.java | 34 +-
.../python/PythonDockerInterpreterTest.java | 9 +-
.../python/PythonInterpreterMatplotlibTest.java | 146 +++---
.../python/PythonInterpreterPandasSqlTest.java | 94 ++--
.../zeppelin/python/PythonInterpreterTest.java | 274 +++--------
...ythonInterpreterWithPythonInstalledTest.java | 125 -----
18 files changed, 950 insertions(+), 1067 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/pom.xml
----------------------------------------------------------------------
diff --git a/python/pom.xml b/python/pom.xml
index 3552437..681986c 100644
--- a/python/pom.xml
+++ b/python/pom.xml
@@ -99,6 +99,43 @@
</plugin>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>wagon-maven-plugin</artifactId>
+ <version>1.0</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals><goal>download-single</goal></goals>
+ <configuration>
+ <url>https://pypi.python.org/packages/64/5c/01e13b68e8caafece40d549f232c9b5677ad1016071a48d04cc3895acaa3</url>
+ <fromFile>py4j-${py4j.version}.zip</fromFile>
+ <toFile>${project.build.directory}/../../interpreter/python/py4j-${py4j.version}.zip</toFile>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <unzip src="${project.build.directory}/../../interpreter/python/py4j-${py4j.version}.zip"
+ dest="${project.build.directory}/../../interpreter/python"/>
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java
index c41a3cc..582debd 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonDockerInterpreter.java
@@ -21,10 +21,8 @@ import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
+import java.io.*;
+import java.nio.file.Paths;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -37,6 +35,7 @@ public class PythonDockerInterpreter extends Interpreter {
Pattern activatePattern = Pattern.compile("activate\\s*(.*)");
Pattern deactivatePattern = Pattern.compile("deactivate");
Pattern helpPattern = Pattern.compile("help");
+ private File zeppelinHome;
public PythonDockerInterpreter(Properties property) {
super(property);
@@ -44,7 +43,11 @@ public class PythonDockerInterpreter extends Interpreter {
@Override
public void open() {
-
+ if (System.getenv("ZEPPELIN_HOME") != null) {
+ zeppelinHome = new File(System.getenv("ZEPPELIN_HOME"));
+ } else {
+ zeppelinHome = Paths.get("..").toAbsolutePath().toFile();
+ }
}
@Override
@@ -54,6 +57,7 @@ public class PythonDockerInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
+ File pythonScript = new File(getPythonInterpreter().getScriptPath());
InterpreterOutput out = context.out;
Matcher activateMatcher = activatePattern.matcher(st);
@@ -66,7 +70,27 @@ public class PythonDockerInterpreter extends Interpreter {
} else if (activateMatcher.matches()) {
String image = activateMatcher.group(1);
pull(out, image);
- setPythonCommand("docker run -i --rm " + image + " python -iu");
+
+ // mount pythonscript dir
+ String mountPythonScript = "-v " +
+ pythonScript.getParentFile().getAbsolutePath() +
+ ":/_zeppelin_tmp ";
+
+ // mount zeppelin dir
+ String mountPy4j = "-v " +
+ zeppelinHome.getAbsolutePath() +
+ ":/_zeppelin ";
+
+ // set PYTHONPATH
+ String pythonPath = ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PY4JPATH + ":" +
+ ":/_zeppelin/" + PythonInterpreter.ZEPPELIN_PYTHON_LIBS;
+
+ setPythonCommand("docker run -i --rm " +
+ mountPythonScript +
+ mountPy4j +
+ "-e PYTHONPATH=\"" + pythonPath + "\" " +
+ image +
+ " python /_zeppelin_tmp/" + pythonScript.getName());
restartPythonProcess();
out.clear();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated");
@@ -79,6 +103,7 @@ public class PythonDockerInterpreter extends Interpreter {
}
}
+
public void setPythonCommand(String cmd) {
PythonInterpreter python = getPythonInterpreter();
python.setPythonCommand(cmd);
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index d77b59a..f825568 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -18,20 +18,38 @@
package org.apache.zeppelin.python;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
-import java.net.ServerSocket;
+import java.io.OutputStreamWriter;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.*;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
-import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.exec.DefaultExecutor;
+import org.apache.commons.exec.ExecuteException;
+import org.apache.commons.exec.ExecuteResultHandler;
+import org.apache.commons.exec.ExecuteWatchdog;
+import org.apache.commons.exec.PumpStreamHandler;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@@ -39,144 +57,360 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
+import py4j.commands.Command;
/**
* Python interpreter for Zeppelin.
*/
-public class PythonInterpreter extends Interpreter {
+public class PythonInterpreter extends Interpreter implements ExecuteResultHandler {
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreter.class);
-
- public static final String BOOTSTRAP_PY = "/bootstrap.py";
- public static final String BOOTSTRAP_INPUT_PY = "/bootstrap_input.py";
- public static final String ZEPPELIN_PYTHON = "zeppelin.python";
+ public static final String ZEPPELIN_PYTHON = "python/zeppelin_python.py";
+ public static final String ZEPPELIN_PY4JPATH = "interpreter/python/py4j-0.9.2/src";
+ public static final String ZEPPELIN_PYTHON_LIBS = "interpreter/lib/python";
public static final String DEFAULT_ZEPPELIN_PYTHON = "python";
public static final String MAX_RESULT = "zeppelin.python.maxResult";
- private Integer port;
- private GatewayServer gatewayServer;
- private Boolean py4JisInstalled = false;
private InterpreterContext context;
private Pattern errorInLastLine = Pattern.compile(".*(Error|Exception): .*$");
private String pythonPath;
private int maxResult;
+ private String py4jLibPath;
+ private String pythonLibPath;
+
+ private String pythonCommand;
+
+ private GatewayServer gatewayServer;
+ private DefaultExecutor executor;
+ private int port;
+ private InterpreterOutputStream outputStream;
+ private BufferedWriter ins;
+ private PipedInputStream in;
+ private ByteArrayOutputStream input;
+ private String scriptPath;
+ boolean pythonscriptRunning = false;
+ private static final int MAX_TIMEOUT_SEC = 10;
- PythonProcess process = null;
- private String pythonCommand = null;
+ private long pythonPid = 0;
+
+ Integer statementSetNotifier = new Integer(0);
public PythonInterpreter(Properties property) {
super(property);
+ try {
+ File scriptFile = File.createTempFile("zeppelin_python-", ".py", new File("/tmp"));
+ scriptPath = scriptFile.getAbsolutePath();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
}
- @Override
- public void open() {
- // Add matplotlib display hook
- InterpreterGroup intpGroup = getInterpreterGroup();
- if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
- registerHook(HookType.POST_EXEC_DEV, "\nz._displayhook()");
+ private String workingDir() {
+ URL myURL = getClass().getProtectionDomain().getCodeSource().getLocation();
+ java.net.URI myURI = null;
+ try {
+ myURI = myURL.toURI();
+ } catch (URISyntaxException e1)
+ {}
+ String path = java.nio.file.Paths.get(myURI).toFile().toString();
+ return path;
+ }
+
+ private void createPythonScript() {
+ File out = new File(scriptPath);
+
+ if (out.exists() && out.isDirectory()) {
+ throw new InterpreterException("Can't create python script " + out.getAbsolutePath());
}
- // Add zeppelin-bundled libs to PYTHONPATH
- setPythonPath("../interpreter/lib/python:$PYTHONPATH");
- LOG.info("Starting Python interpreter ---->");
- LOG.info("Python path is set to:" + property.getProperty(ZEPPELIN_PYTHON));
+ copyFile(out, ZEPPELIN_PYTHON);
+ logger.info("File {} created", scriptPath);
+ }
- maxResult = Integer.valueOf(getProperty(MAX_RESULT));
- process = getPythonProcess();
+ public String getScriptPath() {
+ return scriptPath;
+ }
+ private void copyFile(File out, String sourceFile) {
+ ClassLoader classLoader = getClass().getClassLoader();
try {
- process.open();
+ FileOutputStream outStream = new FileOutputStream(out);
+ IOUtils.copy(
+ classLoader.getResourceAsStream(sourceFile),
+ outStream);
+ outStream.close();
} catch (IOException e) {
- LOG.error("Can't start the python process", e);
+ throw new InterpreterException(e);
+ }
+ }
+
+ private void createGatewayServerAndStartScript() throws UnknownHostException {
+ createPythonScript();
+ if (System.getenv("ZEPPELIN_HOME") != null) {
+ py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PY4JPATH;
+ pythonLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PYTHON_LIBS;
+ } else {
+ Path workingPath = Paths.get("..").toAbsolutePath();
+ py4jLibPath = workingPath + File.separator + ZEPPELIN_PY4JPATH;
+ pythonLibPath = workingPath + File.separator + ZEPPELIN_PYTHON_LIBS;
+ }
+
+ port = findRandomOpenPortOnAllLocalInterfaces();
+ gatewayServer = new GatewayServer(this,
+ port,
+ GatewayServer.DEFAULT_PYTHON_PORT,
+ InetAddress.getByName("0.0.0.0"),
+ InetAddress.getByName("0.0.0.0"),
+ GatewayServer.DEFAULT_CONNECT_TIMEOUT,
+ GatewayServer.DEFAULT_READ_TIMEOUT,
+ (List) null);
+
+ gatewayServer.start();
+
+ // Run python shell
+ String pythonCmd = getPythonCommand();
+ CommandLine cmd = CommandLine.parse(pythonCmd);
+
+ if (!pythonCmd.endsWith(".py")) {
+ // PythonDockerInterpreter set pythoncmd with script
+ cmd.addArgument(getScriptPath(), false);
}
+ cmd.addArgument(Integer.toString(port), false);
+ cmd.addArgument(getLocalIp(), false);
+ executor = new DefaultExecutor();
+ outputStream = new InterpreterOutputStream(logger);
+ PipedOutputStream ps = new PipedOutputStream();
+ in = null;
try {
- LOG.info("python PID : " + process.getPid());
- } catch (Exception e) {
- LOG.warn("Can't find python pid process", e);
+ in = new PipedInputStream(ps);
+ } catch (IOException e1) {
+ throw new InterpreterException(e1);
}
+ ins = new BufferedWriter(new OutputStreamWriter(ps));
+ input = new ByteArrayOutputStream();
+
+ PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream, outputStream, in);
+ executor.setStreamHandler(streamHandler);
+ executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
try {
- LOG.info("Bootstrap interpreter with " + BOOTSTRAP_PY);
- bootStrapInterpreter(BOOTSTRAP_PY);
+ Map env = EnvironmentUtils.getProcEnvironment();
+ if (!env.containsKey("PYTHONPATH")) {
+ env.put("PYTHONPATH", py4jLibPath + File.pathSeparator + pythonLibPath);
+ } else {
+ env.put("PYTHONPATH", env.get("PYTHONPATH") + File.pathSeparator +
+ py4jLibPath + File.pathSeparator + pythonLibPath);
+ }
+
+ logger.info("cmd = {}", cmd.toString());
+ executor.execute(cmd, env, this);
+ pythonscriptRunning = true;
} catch (IOException e) {
- LOG.error("Can't execute " + BOOTSTRAP_PY + " to initiate python process", e);
+ throw new InterpreterException(e);
}
- py4JisInstalled = isPy4jInstalled();
- if (py4JisInstalled) {
- port = findRandomOpenPortOnAllLocalInterfaces();
- LOG.info("Py4j gateway port : " + port);
- try {
- gatewayServer = new GatewayServer(this, port);
- gatewayServer.start();
- LOG.info("Bootstrap inputs with " + BOOTSTRAP_INPUT_PY);
- bootStrapInterpreter(BOOTSTRAP_INPUT_PY);
- } catch (IOException e) {
- LOG.error("Can't execute " + BOOTSTRAP_INPUT_PY + " to " +
- "initialize Zeppelin inputs in python process", e);
- }
+ try {
+ input.write("import sys, getopt\n".getBytes());
+ ins.flush();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
+ }
+
+ @Override
+ public void open() {
+ // Add matplotlib display hook
+ InterpreterGroup intpGroup = getInterpreterGroup();
+ if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) {
+ registerHook(HookType.POST_EXEC_DEV, "z._displayhook()");
+ }
+ // Add matplotlib display hook
+ try {
+ createGatewayServerAndStartScript();
+ } catch (UnknownHostException e) {
+ throw new InterpreterException(e);
}
}
@Override
public void close() {
- LOG.info("closing Python interpreter <----");
+ pythonscriptRunning = false;
+ pythonScriptInitialized = false;
+
try {
- if (process != null) {
- process.close();
- process = null;
+ ins.flush();
+ ins.close();
+ input.flush();
+ input.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ executor.getWatchdog().destroyProcess();
+ new File(scriptPath).delete();
+ gatewayServer.shutdown();
+
+ // wait until getStatements stop
+ synchronized (statementSetNotifier) {
+ try {
+ statementSetNotifier.wait(1500);
+ } catch (InterruptedException e) {
}
- if (gatewayServer != null) {
- gatewayServer.shutdown();
+ statementSetNotifier.notify();
+ }
+ }
+
+ PythonInterpretRequest pythonInterpretRequest = null;
+ /**
+ * Result class of python interpreter
+ */
+ public class PythonInterpretRequest {
+ public String statements;
+
+ public PythonInterpretRequest(String statements) {
+ this.statements = statements;
+ }
+
+ public String statements() {
+ return statements;
+ }
+ }
+
+ public PythonInterpretRequest getStatements() {
+ synchronized (statementSetNotifier) {
+ while (pythonInterpretRequest == null && pythonscriptRunning && pythonScriptInitialized) {
+ try {
+ statementSetNotifier.wait(1000);
+ } catch (InterruptedException e) {
+ }
}
- } catch (IOException e) {
- LOG.error("Can't close the interpreter", e);
+ PythonInterpretRequest req = pythonInterpretRequest;
+ pythonInterpretRequest = null;
+ return req;
+ }
+ }
+
+ String statementOutput = null;
+ boolean statementError = false;
+ Integer statementFinishedNotifier = new Integer(0);
+
+ public void setStatementsFinished(String out, boolean error) {
+ synchronized (statementFinishedNotifier) {
+ statementOutput = out;
+ statementError = error;
+ statementFinishedNotifier.notify();
+ }
+ }
+
+ boolean pythonScriptInitialized = false;
+ Integer pythonScriptInitializeNotifier = new Integer(0);
+
+ public void onPythonScriptInitialized(long pid) {
+ pythonPid = pid;
+ synchronized (pythonScriptInitializeNotifier) {
+ pythonScriptInitialized = true;
+ pythonScriptInitializeNotifier.notifyAll();
}
}
+ public void appendOutput(String message) throws IOException {
+ outputStream.getInterpreterOutput().write(message);
+ }
+
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
if (cmd == null || cmd.isEmpty()) {
return new InterpreterResult(Code.SUCCESS, "");
}
+
this.context = contextInterpreter;
- String output = sendCommandToPython(cmd);
- InterpreterResult result;
- if (pythonErrorIn(output)) {
- result = new InterpreterResult(Code.ERROR, output.replaceAll("\\.\\.\\.", ""));
+ if (!pythonscriptRunning) {
+ return new InterpreterResult(Code.ERROR, "python process not running"
+ + outputStream.toString());
+ }
+
+ outputStream.setInterpreterOutput(context.out);
+
+ synchronized (pythonScriptInitializeNotifier) {
+ long startTime = System.currentTimeMillis();
+ while (pythonScriptInitialized == false
+ && pythonscriptRunning
+ && System.currentTimeMillis() - startTime < MAX_TIMEOUT_SEC * 1000) {
+ try {
+ pythonScriptInitializeNotifier.wait(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ List<InterpreterResultMessage> errorMessage;
+ try {
+ context.out.flush();
+ errorMessage = context.out.toInterpreterResultMessage();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
+
+ if (pythonscriptRunning == false) {
+ // python script failed to initialize and terminated
+ errorMessage.add(new InterpreterResultMessage(
+ InterpreterResult.Type.TEXT, "failed to start python"));
+ return new InterpreterResult(Code.ERROR, errorMessage);
+ }
+ if (pythonScriptInitialized == false) {
+ // timeout. didn't get initialized message
+ errorMessage.add(new InterpreterResultMessage(
+ InterpreterResult.Type.TEXT, "python is not responding"));
+ return new InterpreterResult(Code.ERROR, errorMessage);
+ }
+
+ pythonInterpretRequest = new PythonInterpretRequest(cmd);
+ statementOutput = null;
+
+ synchronized (statementSetNotifier) {
+ statementSetNotifier.notify();
+ }
+
+ synchronized (statementFinishedNotifier) {
+ while (statementOutput == null) {
+ try {
+ statementFinishedNotifier.wait(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ if (statementError) {
+ return new InterpreterResult(Code.ERROR, statementOutput);
} else {
- result = new InterpreterResult(Code.SUCCESS, output);
+
+ try {
+ context.out.flush();
+ } catch (IOException e) {
+ throw new InterpreterException(e);
+ }
+
+ return new InterpreterResult(Code.SUCCESS);
}
- return result;
}
- /**
- * Checks if there is a syntax error or an exception
- *
- * @param output Python interpreter output
- * @return true if syntax error or exception has happened
- */
- private boolean pythonErrorIn(String output) {
- boolean isError = false;
- String[] outputMultiline = output.split("\n");
- Matcher errorMatcher;
- for (String row : outputMultiline) {
- errorMatcher = errorInLastLine.matcher(row);
- if (errorMatcher.find() == true) {
- isError = true;
- break;
- }
+ public void interrupt() throws IOException {
+ if (pythonPid > -1) {
+ logger.info("Sending SIGINT signal to PID : " + pythonPid);
+ Runtime.getRuntime().exec("kill -SIGINT " + pythonPid);
+ } else {
+ logger.warn("Non UNIX/Linux system, close the interpreter");
+ close();
}
- return isError;
}
@Override
public void cancel(InterpreterContext context) {
try {
- process.interrupt();
+ interrupt();
} catch (IOException e) {
- LOG.error("Can't interrupt the python interpreter", e);
+ e.printStackTrace();
}
}
@@ -201,28 +435,17 @@ public class PythonInterpreter extends Interpreter {
return null;
}
- public void setPythonPath(String pythonPath) {
- this.pythonPath = pythonPath;
- }
-
- public PythonProcess getPythonProcess() {
- if (process == null) {
- String binPath = getProperty(ZEPPELIN_PYTHON);
- if (pythonCommand != null) {
- binPath = pythonCommand;
- }
- return new PythonProcess(binPath, pythonPath);
- } else {
- return process;
- }
- }
-
public void setPythonCommand(String cmd) {
+ logger.info("Set Python Command : {}", cmd);
pythonCommand = cmd;
}
public String getPythonCommand() {
- return pythonCommand;
+ if (pythonCommand == null) {
+ return DEFAULT_ZEPPELIN_PYTHON;
+ } else {
+ return pythonCommand;
+ }
}
private Job getRunningJob(String paragraphId) {
@@ -237,24 +460,6 @@ public class PythonInterpreter extends Interpreter {
return foundJob;
}
-
- /**
- * Sends given text to Python interpreter, blocks and returns the output
- * @param cmd Python expression text
- * @return output
- */
- String sendCommandToPython(String cmd) {
- String output = "";
- LOG.debug("Sending : \n" + (cmd.length() > 200 ? cmd.substring(0, 200) + "..." : cmd));
- try {
- output = process.sendAndGetResult(cmd);
- } catch (IOException e) {
- LOG.error("Error when sending commands to python process", e);
- }
- LOG.debug("Got : \n" + output);
- return output;
- }
-
void bootStrapInterpreter(String file) throws IOException {
BufferedReader bootstrapReader = new BufferedReader(
new InputStreamReader(
@@ -265,24 +470,22 @@ public class PythonInterpreter extends Interpreter {
while ((line = bootstrapReader.readLine()) != null) {
bootstrapCode += line + "\n";
}
- if (py4JisInstalled && port != null && port != -1) {
- bootstrapCode = bootstrapCode.replaceAll("\\%PORT\\%", port.toString());
- }
- LOG.info("Bootstrap python interpreter with code from \n " + file);
- sendCommandToPython(bootstrapCode);
+
+ interpret(bootstrapCode, context);
}
public GUI getGui() {
return context.getGui();
}
- public Integer getPy4jPort() {
- return port;
- }
-
- public Boolean isPy4jInstalled() {
- String output = sendCommandToPython("\n\nimport py4j\n");
- return !output.contains("ImportError");
+ String getLocalIp() {
+ try {
+ return Inet4Address.getLocalHost().getHostAddress();
+ } catch (UnknownHostException e) {
+ logger.error("can't get local IP", e);
+ }
+ // fall back to loopback addreess
+ return "127.0.0.1";
}
private int findRandomOpenPortOnAllLocalInterfaces() {
@@ -299,4 +502,16 @@ public class PythonInterpreter extends Interpreter {
public int getMaxResult() {
return maxResult;
}
+
+ @Override
+ public void onProcessComplete(int exitValue) {
+ pythonscriptRunning = false;
+ logger.info("python process terminated. exit code " + exitValue);
+ }
+
+ @Override
+ public void onProcessFailed(ExecuteException e) {
+ pythonscriptRunning = false;
+ logger.error("python process failed", e);
+ }
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
index 381066f..6bf1970 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
public class PythonInterpreterPandasSql extends Interpreter {
private static final Logger LOG = LoggerFactory.getLogger(PythonInterpreterPandasSql.class);
- private String SQL_BOOTSTRAP_FILE_PY = "/bootstrap_sql.py";
+ private String SQL_BOOTSTRAP_FILE_PY = "/python/bootstrap_sql.py";
public PythonInterpreterPandasSql(Properties property) {
super(property);
@@ -64,25 +64,17 @@ public class PythonInterpreterPandasSql extends Interpreter {
@Override
public void open() {
LOG.info("Open Python SQL interpreter instance: {}", this.toString());
+
try {
LOG.info("Bootstrap {} interpreter with {}", this.toString(), SQL_BOOTSTRAP_FILE_PY);
PythonInterpreter python = getPythonInterpreter();
+
python.bootStrapInterpreter(SQL_BOOTSTRAP_FILE_PY);
} catch (IOException e) {
LOG.error("Can't execute " + SQL_BOOTSTRAP_FILE_PY + " to import SQL dependencies", e);
}
}
- /**
- * Checks if Python dependencies pandas and pandasql are installed
- * @return True if they are
- */
- boolean isPandasAndPandasqlInstalled() {
- PythonInterpreter python = getPythonInterpreter();
- String output = python.sendCommandToPython("\n\nimport pandas\nimport pandasql\n");
- return !output.contains("ImportError");
- }
-
@Override
public void close() {
LOG.info("Close Python SQL interpreter instance: {}", this.toString());
@@ -94,7 +86,8 @@ public class PythonInterpreterPandasSql extends Interpreter {
public InterpreterResult interpret(String st, InterpreterContext context) {
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
Interpreter python = getPythonInterpreter();
- return python.interpret("z.show(pysqldf('" + st + "'))", context);
+
+ return python.interpret("z.show(pysqldf('" + st + "'))\nz._displayhook()", context);
}
@Override
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java
----------------------------------------------------------------------
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java b/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java
deleted file mode 100644
index 578ffeb..0000000
--- a/python/src/main/java/org/apache/zeppelin/python/PythonProcess.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.zeppelin.python;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.io.OutputStream;
-import java.lang.reflect.Field;
-
-/**
- * Object encapsulated interactive
- * Python process (REPL) used by python interpreter
- */
-public class PythonProcess {
- private static final Logger logger = LoggerFactory.getLogger(PythonProcess.class);
- private static final String STATEMENT_END = "*!?flush reader!?*";
- InputStream stdout;
- OutputStream stdin;
- PrintWriter writer;
- BufferedReader reader;
- Process process;
-
- private String binPath;
- private String pythonPath;
- private long pid;
-
- public PythonProcess(String binPath, String pythonPath) {
- this.binPath = binPath;
- this.pythonPath = pythonPath;
- }
-
- public void open() throws IOException {
- ProcessBuilder builder;
- boolean hasParams = binPath.split(" ").length > 1;
- if (System.getProperty("os.name").toLowerCase().contains("windows")) {
- if (hasParams) {
- builder = new ProcessBuilder(binPath.split(" "));
- } else {
- builder = new ProcessBuilder(binPath, "-iu");
- }
- } else {
- String cmd;
- if (hasParams) {
- cmd = binPath;
- } else {
- cmd = binPath + " -iu";
- }
- builder = new ProcessBuilder("bash", "-c", cmd);
- if (pythonPath != null) {
- builder.environment().put("PYTHONPATH", pythonPath);
- }
- }
-
- builder.redirectErrorStream(true);
- process = builder.start();
- stdout = process.getInputStream();
- stdin = process.getOutputStream();
- writer = new PrintWriter(stdin, true);
- reader = new BufferedReader(new InputStreamReader(stdout));
- try {
- pid = findPid();
- } catch (Exception e) {
- logger.warn("Can't find python pid process", e);
- pid = -1;
- }
- }
-
- public void close() throws IOException {
- process.destroy();
- reader.close();
- writer.close();
- stdin.close();
- stdout.close();
- }
-
- public void interrupt() throws IOException {
- if (pid > -1) {
- logger.info("Sending SIGINT signal to PID : " + pid);
- Runtime.getRuntime().exec("kill -SIGINT " + pid);
- } else {
- logger.warn("Non UNIX/Linux system, close the interpreter");
- close();
- }
- }
-
- public String sendAndGetResult(String cmd) throws IOException {
- writer.println(cmd);
- writer.println();
- writer.println("\"" + STATEMENT_END + "\"");
- StringBuilder output = new StringBuilder();
- String line = null;
-
- while ((line = reader.readLine()) != null &&
- !line.contains(STATEMENT_END)) {
- logger.debug("Read line from python shell : " + line);
- output.append(line + "\n");
- }
-
- return output.toString();
- }
-
- private long findPid() throws NoSuchFieldException, IllegalAccessException {
- long pid = -1;
- if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
- Field f = process.getClass().getDeclaredField("pid");
- f.setAccessible(true);
- pid = f.getLong(process);
- f.setAccessible(false);
- }
- return pid;
- }
-
- public long getPid() {
- return pid;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/resources/__init__.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/__init__.py b/python/src/main/resources/__init__.py
deleted file mode 100644
index ec20143..0000000
--- a/python/src/main/resources/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/resources/bootstrap.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/bootstrap.py b/python/src/main/resources/bootstrap.py
deleted file mode 100644
index 0a20a34..0000000
--- a/python/src/main/resources/bootstrap.py
+++ /dev/null
@@ -1,234 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# PYTHON 2 / 3 compatibility :
-# bootstrap.py must be runnable with Python 2 or 3
-
-import os
-import sys
-import signal
-import base64
-import warnings
-from io import BytesIO
-try:
- from StringIO import StringIO
-except ImportError:
- from io import StringIO
-
-def intHandler(signum, frame): # Set the signal handler
- print ("Paragraph interrupted")
- raise KeyboardInterrupt()
-
-signal.signal(signal.SIGINT, intHandler)
-# set prompt as empty string so that java side don't need to remove the prompt.
-sys.ps1=""
-
-def help():
- print("""%html
- <h2>Python Interpreter help</h2>
-
- <h3>Python 2 & 3 compatibility</h3>
- <p>The interpreter is compatible with Python 2 & 3.<br/>
- To change Python version,
- change in the interpreter configuration the python to the
- desired version (example : python=/usr/bin/python3)</p>
-
- <h3>Python modules</h3>
- <p>The interpreter can use all modules already installed
- (with pip, easy_install, etc)</p>
-
- <h3>Forms</h3>
- You must install py4j in order to use
- the form feature (pip install py4j)
- <h4>Input form</h4>
- <pre>print (z.input("f1","defaultValue"))</pre>
- <h4>Selection form</h4>
- <pre>print(z.select("f2", [("o1","1"), ("o2","2")],2))</pre>
- <h4>Checkbox form</h4>
- <pre> print("".join(z.checkbox("f3", [("o1","1"), ("o2","2")],["1"])))</pre>')
-
- <h3>Matplotlib graph</h3>
- <div>The interpreter can display matplotlib graph with
- the function z.show()</div>
- <div> You need to already have matplotlib module installed
- to use this functionality !</div><br/>
- <pre>import matplotlib.pyplot as plt
- plt.figure()
- (.. ..)
- z.show(plt)
- plt.close()
- </pre>
- <div><br/> z.show function can take optional parameters
- to adapt graph dimensions (width and height) and format (png or svg)</div>
- <div><b>example </b>:
- <pre>z.show(plt,width='50px
- z.show(plt,height='150px', fmt='svg') </pre></div>
-
- <h3>Pandas DataFrame</h3>
- <div> You need to have Pandas module installed
- to use this functionality (pip install pandas) !</div><br/>
- <div>The interpreter can visualize Pandas DataFrame
- with the function z.show()
- <pre>
- import pandas as pd
- df = pd.read_csv("bank.csv", sep=";")
- z.show(df)
- </pre></div>
-
- <h3>SQL over Pandas DataFrame</h3>
- <div> You need to have Pandas&Pandasql modules installed
- to use this functionality (pip install pandas pandasql) !</div><br/>
-
- <div>Python interpreter group includes %sql interpreter that can query
- Pandas DataFrames using SQL and visualize results using Zeppelin Table Display System
-
- <pre>
- %python
- import pandas as pd
- df = pd.read_csv("bank.csv", sep=";")
- </pre>
- <br />
- <pre>
- %python.sql
- %sql
- SELECT * from df LIMIT 5
- </pre>
- </div>
- """)
-
-
-class PyZeppelinContext(object):
- """ If py4j is detected, these class will be override
- with the implementation in bootstrap_input.py
- """
- errorMsg = "You must install py4j Python module " \
- "(pip install py4j) to use Zeppelin dynamic forms features"
-
- def __init__(self):
- self.max_result = 1000
- self._displayhook = lambda *args: None
- self._setup_matplotlib()
-
- def input(self, name, defaultValue=""):
- print(self.errorMsg)
-
- def select(self, name, options, defaultValue=""):
- print(self.errorMsg)
-
- def checkbox(self, name, options, defaultChecked=[]):
- print(self.errorMsg)
-
- def show(self, p, **kwargs):
- if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot":
- self.show_matplotlib(p, **kwargs)
- elif type(p).__name__ == "DataFrame": # does not play well with sub-classes
- # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
- # and so a dependency on pandas
- self.show_dataframe(p, **kwargs)
- elif hasattr(p, '__call__'):
- p() #error reporting
-
- def show_dataframe(self, df, show_index=False, **kwargs):
- """Pretty prints DF using Table Display System
- """
- limit = len(df) > self.max_result
- header_buf = StringIO("")
- if show_index:
- idx_name = str(df.index.name) if df.index.name is not None else ""
- header_buf.write(idx_name + "\t")
- header_buf.write(str(df.columns[0]))
- for col in df.columns[1:]:
- header_buf.write("\t")
- header_buf.write(str(col))
- header_buf.write("\n")
-
- body_buf = StringIO("")
- rows = df.head(self.max_result).values if limit else df.values
- index = df.index.values
- for idx, row in zip(index, rows):
- if show_index:
- body_buf.write("%html <strong>{}</strong>".format(idx))
- body_buf.write("\t")
- body_buf.write(str(row[0]))
- for cell in row[1:]:
- body_buf.write("\t")
- body_buf.write(str(cell))
- body_buf.write("\n")
- body_buf.seek(0); header_buf.seek(0)
- #TODO(bzz): fix it, so it shows red notice, as in Spark
- print("%table " + header_buf.read() + body_buf.read()) # +
- # ("\n<font color=red>Results are limited by {}.</font>" \
- # .format(self.max_result) if limit else "")
- #)
- body_buf.close(); header_buf.close()
-
- def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
- **kwargs):
- """Matplotlib show function
- """
- if fmt == "png":
- img = BytesIO()
- p.savefig(img, format=fmt)
- img_str = b"data:image/png;base64,"
- img_str += base64.b64encode(img.getvalue().strip())
- img_tag = "<img src={img} style='width={width};height:{height}'>"
- # Decoding is necessary for Python 3 compability
- img_str = img_str.decode("ascii")
- img_str = img_tag.format(img=img_str, width=width, height=height)
- elif fmt == "svg":
- img = StringIO()
- p.savefig(img, format=fmt)
- img_str = img.getvalue()
- else:
- raise ValueError("fmt must be 'png' or 'svg'")
-
- html = "%html <div style='width:{width};height:{height}'>{img}<div>"
- print(html.format(width=width, height=height, img=img_str))
- img.close()
-
- def configure_mpl(self, **kwargs):
- import mpl_config
- mpl_config.configure(**kwargs)
-
- def _setup_matplotlib(self):
- # If we don't have matplotlib installed don't bother continuing
- try:
- import matplotlib
- except ImportError:
- return
- # Make sure custom backends are available in the PYTHONPATH
- rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
- mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
- if mpl_path not in sys.path:
- sys.path.append(mpl_path)
-
- # Finally check if backend exists, and if so configure as appropriate
- try:
- matplotlib.use('module://backend_zinline')
- import backend_zinline
-
- # Everything looks good so make config assuming that we are using
- # an inline backend
- self._displayhook = backend_zinline.displayhook
- self.configure_mpl(width=600, height=400, dpi=72,
- fontsize=10, interactive=True, format='png')
- except ImportError:
- # Fall back to Agg if no custom backend installed
- matplotlib.use('Agg')
- warnings.warn("Unable to load inline matplotlib backend, "
- "falling back to Agg")
-
-
-z = PyZeppelinContext()
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/resources/bootstrap_input.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/bootstrap_input.py b/python/src/main/resources/bootstrap_input.py
deleted file mode 100644
index 6a0c544..0000000
--- a/python/src/main/resources/bootstrap_input.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from py4j.java_gateway import JavaGateway
-from py4j.java_gateway import java_import, JavaGateway, GatewayClient
-
-
-client = GatewayClient(port=%PORT%)
-gateway = JavaGateway(client)
-java_import(gateway.jvm, "org.apache.zeppelin.display.Input")
-
-
-class Py4jZeppelinContext(PyZeppelinContext):
- """A context impl that uses Py4j to communicate to JVM
- """
- def __init__(self, z):
- PyZeppelinContext.__init__(self)
- self.z = z
- self.paramOption = gateway.jvm.org.apache.zeppelin.display.Input.ParamOption
- self.javaList = gateway.jvm.java.util.ArrayList
- self.max_result = self.z.getMaxResult()
-
- def input(self, name, defaultValue=""):
- return self.z.getGui().input(name, defaultValue)
-
- def select(self, name, options, defaultValue=""):
- javaOptions = gateway.new_array(self.paramOption, len(options))
- i = 0
- for tuple in options:
- javaOptions[i] = self.paramOption(tuple[0], tuple[1])
- i += 1
- return self.z.getGui().select(name, defaultValue, javaOptions)
-
- def checkbox(self, name, options, defaultChecked=[]):
- javaOptions = gateway.new_array(self.paramOption, len(options))
- i = 0
- for tuple in options:
- javaOptions[i] = self.paramOption(tuple[0], tuple[1])
- i += 1
- javaDefaultCheck = self.javaList()
- for check in defaultChecked:
- javaDefaultCheck.append(check)
- return self.z.getGui().checkbox(name, javaDefaultCheck, javaOptions)
-
-
-z = Py4jZeppelinContext(gateway.entry_point)
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/resources/bootstrap_sql.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/bootstrap_sql.py b/python/src/main/resources/bootstrap_sql.py
deleted file mode 100644
index d8248c9..0000000
--- a/python/src/main/resources/bootstrap_sql.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Setup SQL over Pandas DataFrames
-# It requires next dependencies to be installed:
-# - pandas
-# - pandasql
-
-from __future__ import print_function
-
-try:
- from pandasql import sqldf
- pysqldf = lambda q: sqldf(q, globals())
-except ImportError:
- pysqldf = lambda q: print("Can not run SQL over Pandas DataFrame" +
- "Make sure 'pandas' and 'pandasql' libraries are installed")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/resources/interpreter-setting.json
----------------------------------------------------------------------
diff --git a/python/src/main/resources/interpreter-setting.json b/python/src/main/resources/interpreter-setting.json
index af0ba89..bc4d4ec 100644
--- a/python/src/main/resources/interpreter-setting.json
+++ b/python/src/main/resources/interpreter-setting.json
@@ -39,7 +39,7 @@
"className": "org.apache.zeppelin.python.PythonCondaInterpreter",
"properties": {
},
- "editor":{
+ "editor": {
"language": "sh",
"editOnDblClick": false
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/resources/python/bootstrap_sql.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/python/bootstrap_sql.py b/python/src/main/resources/python/bootstrap_sql.py
new file mode 100644
index 0000000..6f1ae81
--- /dev/null
+++ b/python/src/main/resources/python/bootstrap_sql.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Setup SQL over Pandas DataFrames
+# It requires next dependencies to be installed:
+# - pandas
+# - pandasql
+
+from __future__ import print_function
+
+try:
+ from pandasql import sqldf
+ pysqldf = lambda q: sqldf(q, globals())
+except ImportError:
+ pysqldf = lambda q: print("Can not run SQL over Pandas DataFrame" +
+ "Make sure 'pandas' and 'pandasql' libraries are installed")
+
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/main/resources/python/zeppelin_python.py
----------------------------------------------------------------------
diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py
new file mode 100644
index 0000000..0a36cba
--- /dev/null
+++ b/python/src/main/resources/python/zeppelin_python.py
@@ -0,0 +1,276 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os, sys, getopt, traceback, json, re
+
+from py4j.java_gateway import java_import, JavaGateway, GatewayClient
+from py4j.protocol import Py4JJavaError, Py4JNetworkError
+import warnings
+import ast
+import traceback
+import warnings
+import signal
+
+from io import BytesIO
+try:
+ from StringIO import StringIO
+except ImportError:
+ from io import StringIO
+
+# for back compatibility
+
+class Logger(object):
+ def __init__(self):
+ pass
+
+ def write(self, message):
+ intp.appendOutput(message)
+
+ def reset(self):
+ pass
+
+ def flush(self):
+ pass
+
+
+class PyZeppelinContext(object):
+ """ If py4j is detected, these class will be override
+ with the implementation in bootstrap_input.py
+ """
+ errorMsg = "You must install py4j Python module " \
+ "(pip install py4j) to use Zeppelin dynamic forms features"
+
+ def __init__(self):
+ self.max_result = 1000
+ self._displayhook = lambda *args: None
+ self._setup_matplotlib()
+
+ def input(self, name, defaultValue=""):
+ print(self.errorMsg)
+
+ def select(self, name, options, defaultValue=""):
+ print(self.errorMsg)
+
+ def checkbox(self, name, options, defaultChecked=[]):
+ print(self.errorMsg)
+
+ def show(self, p, **kwargs):
+ if hasattr(p, '__name__') and p.__name__ == "matplotlib.pyplot":
+ self.show_matplotlib(p, **kwargs)
+ elif type(p).__name__ == "DataFrame": # does not play well with sub-classes
+ # `isinstance(p, DataFrame)` would req `import pandas.core.frame.DataFrame`
+ # and so a dependency on pandas
+ self.show_dataframe(p, **kwargs)
+ elif hasattr(p, '__call__'):
+ p() #error reporting
+
+ def show_dataframe(self, df, show_index=False, **kwargs):
+ """Pretty prints DF using Table Display System
+ """
+ limit = len(df) > self.max_result
+ header_buf = StringIO("")
+ if show_index:
+ idx_name = str(df.index.name) if df.index.name is not None else ""
+ header_buf.write(idx_name + "\t")
+ header_buf.write(str(df.columns[0]))
+ for col in df.columns[1:]:
+ header_buf.write("\t")
+ header_buf.write(str(col))
+ header_buf.write("\n")
+
+ body_buf = StringIO("")
+ rows = df.head(self.max_result).values if limit else df.values
+ index = df.index.values
+ for idx, row in zip(index, rows):
+ if show_index:
+ body_buf.write("%html <strong>{}</strong>".format(idx))
+ body_buf.write("\t")
+ body_buf.write(str(row[0]))
+ for cell in row[1:]:
+ body_buf.write("\t")
+ body_buf.write(str(cell))
+ body_buf.write("\n")
+ body_buf.seek(0); header_buf.seek(0)
+ #TODO(bzz): fix it, so it shows red notice, as in Spark
+ print("%table " + header_buf.read() + body_buf.read()) # +
+ # ("\n<font color=red>Results are limited by {}.</font>" \
+ # .format(self.max_result) if limit else "")
+ #)
+ body_buf.close(); header_buf.close()
+
+ def show_matplotlib(self, p, fmt="png", width="auto", height="auto",
+ **kwargs):
+ """Matplotlib show function
+ """
+ if fmt == "png":
+ img = BytesIO()
+ p.savefig(img, format=fmt)
+ img_str = b"data:image/png;base64,"
+ img_str += base64.b64encode(img.getvalue().strip())
+ img_tag = "<img src={img} style='width={width};height:{height}'>"
+ # Decoding is necessary for Python 3 compability
+ img_str = img_str.decode("ascii")
+ img_str = img_tag.format(img=img_str, width=width, height=height)
+ elif fmt == "svg":
+ img = StringIO()
+ p.savefig(img, format=fmt)
+ img_str = img.getvalue()
+ else:
+ raise ValueError("fmt must be 'png' or 'svg'")
+
+ html = "%html <div style='width:{width};height:{height}'>{img}<div>"
+ print(html.format(width=width, height=height, img=img_str))
+ img.close()
+
+ def configure_mpl(self, **kwargs):
+ import mpl_config
+ mpl_config.configure(**kwargs)
+
+ def _setup_matplotlib(self):
+ # If we don't have matplotlib installed don't bother continuing
+ try:
+ import matplotlib
+ except ImportError:
+ return
+ # Make sure custom backends are available in the PYTHONPATH
+ rootdir = os.environ.get('ZEPPELIN_HOME', os.getcwd())
+ mpl_path = os.path.join(rootdir, 'interpreter', 'lib', 'python')
+ if mpl_path not in sys.path:
+ sys.path.append(mpl_path)
+
+ # Finally check if backend exists, and if so configure as appropriate
+ try:
+ matplotlib.use('module://backend_zinline')
+ import backend_zinline
+
+ # Everything looks good so make config assuming that we are using
+ # an inline backend
+ self._displayhook = backend_zinline.displayhook
+ self.configure_mpl(width=600, height=400, dpi=72,
+ fontsize=10, interactive=True, format='png')
+ except ImportError:
+ # Fall back to Agg if no custom backend installed
+ matplotlib.use('Agg')
+ warnings.warn("Unable to load inline matplotlib backend, "
+ "falling back to Agg")
+
+
+def handler_stop_signals(sig, frame):
+ sys.exit("Got signal : " + str(sig))
+
+
+signal.signal(signal.SIGINT, handler_stop_signals)
+
+host = "127.0.0.1"
+if len(sys.argv) >= 3:
+ host = sys.argv[2]
+
+client = GatewayClient(address=host, port=int(sys.argv[1]))
+
+#gateway = JavaGateway(client, auto_convert = True)
+gateway = JavaGateway(client)
+
+intp = gateway.entry_point
+intp.onPythonScriptInitialized(os.getpid())
+
+z = PyZeppelinContext()
+z._setup_matplotlib()
+
+output = Logger()
+sys.stdout = output
+#sys.stderr = output
+
+while True :
+ req = intp.getStatements()
+ if req == None:
+ break
+
+ try:
+ stmts = req.statements().split("\n")
+ final_code = []
+
+ # Get post-execute hooks
+ try:
+ global_hook = intp.getHook('post_exec_dev')
+ except:
+ global_hook = None
+
+ try:
+ user_hook = z.getHook('post_exec')
+ except:
+ user_hook = None
+
+ nhooks = 0
+ for hook in (global_hook, user_hook):
+ if hook:
+ nhooks += 1
+
+ for s in stmts:
+ if s == None:
+ continue
+
+ # skip comment
+ s_stripped = s.strip()
+ if len(s_stripped) == 0 or s_stripped.startswith("#"):
+ continue
+
+ final_code.append(s)
+
+ if final_code:
+ # use exec mode to compile the statements except the last statement,
+ # so that the last statement's evaluation will be printed to stdout
+ code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1)
+
+ to_run_hooks = []
+ if (nhooks > 0):
+ to_run_hooks = code.body[-nhooks:]
+
+ to_run_exec, to_run_single = (code.body[:-(nhooks + 1)],
+ [code.body[-(nhooks + 1)]])
+
+ try:
+ for node in to_run_exec:
+ mod = ast.Module([node])
+ code = compile(mod, '<stdin>', 'exec')
+ exec(code)
+
+ for node in to_run_single:
+ mod = ast.Interactive([node])
+ code = compile(mod, '<stdin>', 'single')
+ exec(code)
+
+ for node in to_run_hooks:
+ mod = ast.Module([node])
+ code = compile(mod, '<stdin>', 'exec')
+ exec(code)
+ except:
+ raise Exception(traceback.format_exc())
+
+ intp.setStatementsFinished("", False)
+ except Py4JJavaError:
+ excInnerError = traceback.format_exc() # format_tb() does not return the inner exception
+ innerErrorStart = excInnerError.find("Py4JJavaError:")
+ if innerErrorStart > -1:
+ excInnerError = excInnerError[innerErrorStart:]
+ intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True)
+ except Py4JNetworkError:
+ # lost connection from gateway server. exit
+ sys.exit(1)
+ except:
+ intp.setStatementsFinished(traceback.format_exc(), True)
+
+ output.reset()
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java
----------------------------------------------------------------------
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java
index c6d2a84..28d47e0 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonCondaInterpreterTest.java
@@ -1,21 +1,23 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.zeppelin.python;
+
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
@@ -132,4 +134,6 @@ public class PythonCondaInterpreterTest {
null,
new InterpreterOutput(null));
}
+
+
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java
----------------------------------------------------------------------
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java
index b4d3be2..566b5e0 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonDockerInterpreterTest.java
@@ -21,8 +21,11 @@ import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
@@ -46,8 +49,12 @@ public class PythonDockerInterpreterTest {
group.put("note", Arrays.asList(python, docker));
python.setInterpreterGroup(group);
docker.setInterpreterGroup(group);
+
doReturn(true).when(docker).pull(any(InterpreterOutput.class), anyString());
doReturn(python).when(docker).getPythonInterpreter();
+ doReturn("/scriptpath/zeppelin_python.py").when(python).getScriptPath();
+
+ docker.open();
}
@Test
@@ -57,7 +64,7 @@ public class PythonDockerInterpreterTest {
verify(python, times(1)).open();
verify(python, times(1)).close();
verify(docker, times(1)).pull(any(InterpreterOutput.class), anyString());
- verify(python).setPythonCommand("docker run -i --rm env python -iu");
+ verify(python).setPythonCommand(Mockito.matches("docker run -i --rm -v.*"));
}
@Test
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
----------------------------------------------------------------------
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
index 75604dc..8b48b24 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterMatplotlibTest.java
@@ -1,25 +1,22 @@
-
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.zeppelin.python;
-import java.util.*;
-
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
@@ -29,34 +26,27 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.*;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
-/**
- * In order for this test to work, test env must have installed:
- * <ol>
- * - <li>Python</li>
- * - <li>Matplotlib</li>
- * <ol>
- *
- * Your PYTHONPATH should also contain the directory of the Matplotlib
- * backend files. Usually these can be found in $ZEPPELIN_HOME/interpreter/lib/python.
- *
- * To run manually on such environment, use:
- * <code>
- * mvn -Dpython.test.exclude='' test -pl python -am
- * </code>
- */
-public class PythonInterpreterMatplotlibTest {
+import static org.junit.Assert.*;
+public class PythonInterpreterMatplotlibTest implements InterpreterOutputListener {
private InterpreterGroup intpGroup;
private PythonInterpreter python;
private InterpreterContext context;
+ InterpreterOutput out;
@Before
public void setUp() throws Exception {
@@ -68,16 +58,27 @@ public class PythonInterpreterMatplotlibTest {
python = new PythonInterpreter(p);
python.setInterpreterGroup(intpGroup);
- python.open();
List<Interpreter> interpreters = new LinkedList<>();
interpreters.add(python);
intpGroup.put("note", interpreters);
- context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
- new HashMap<String, Object>(), new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null), null,
- new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
+ out = new InterpreterOutput(this);
+
+ context = new InterpreterContext("note", "id", null, "title", "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("id"),
+ new LinkedList<InterpreterContextRunner>(),
+ out);
+ python.open();
+ }
+
+ @After
+ public void afterTest() throws IOException {
+ python.close();
}
@Test
@@ -85,14 +86,14 @@ public class PythonInterpreterMatplotlibTest {
// matplotlib
InterpreterResult ret = python.interpret("import matplotlib", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
-
+
// inline backend
ret = python.interpret("import backend_zinline", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
- public void showPlot() {
+ public void showPlot() throws IOException {
// Simple plot test
InterpreterResult ret;
ret = python.interpret("import matplotlib.pyplot as plt", context);
@@ -100,15 +101,16 @@ public class PythonInterpreterMatplotlibTest {
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret = python.interpret("plt.show()", context);
- assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(ret.message().get(0).getData(), Type.HTML, ret.message().get(0).getType());
- assertTrue(ret.message().get(0).getData().contains("data:image/png;base64"));
- assertTrue(ret.message().get(0).getData().contains("<div>"));
+ assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Type.TEXT, out.getOutputAt(0).getType());
+ assertEquals(new String(out.getOutputAt(1).toByteArray()), InterpreterResult.Type.HTML, out.getOutputAt(1).getType());
+ assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("data:image/png;base64"));
+ assertTrue(new String(out.getOutputAt(1).toByteArray()).contains("<div>"));
}
@Test
// Test for when configuration is set to auto-close figures after show().
- public void testClose() {
+ public void testClose() throws IOException {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
@@ -116,25 +118,33 @@ public class PythonInterpreterMatplotlibTest {
ret = python.interpret("z.configure_mpl(interactive=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret1 = python.interpret("plt.show()", context);
-
+
// Second call to show() should print nothing, and Type should be TEXT.
// This is because when close=True, there should be no living instances
// of FigureManager, causing show() to return before setting the output
// type to HTML.
ret = python.interpret("plt.show()", context);
+
+ assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(0, ret.message().size());
-
+
// Now test that new plot is drawn. It should be identical to the
// previous one.
ret = python.interpret("plt.plot([1, 2, 3])", context);
+ String msg1 = new String(out.getOutputAt(0).toByteArray());
+ InterpreterResult.Type type1 = out.getOutputAt(0).getType();
+
ret2 = python.interpret("plt.show()", context);
- assertEquals(ret1.message().get(0).getType(), ret2.message().get(0).getType());
- assertEquals(ret1.message().get(0).getData(), ret2.message().get(0).getData());
+ String msg2 = new String(out.getOutputAt(0).toByteArray());
+ InterpreterResult.Type type2 = out.getOutputAt(0).getType();
+
+ assertEquals(msg1, msg2);
+ assertEquals(type1, type2);
}
-
+
@Test
// Test for when configuration is set to not auto-close figures after show().
- public void testNoClose() {
+ public void testNoClose() throws IOException {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
@@ -142,19 +152,39 @@ public class PythonInterpreterMatplotlibTest {
ret = python.interpret("z.configure_mpl(interactive=False, close=False)", context);
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret1 = python.interpret("plt.show()", context);
-
+
// Second call to show() should print nothing, and Type should be HTML.
// This is because when close=False, there should be living instances
// of FigureManager, causing show() to set the output
// type to HTML even though the figure is inactive.
ret = python.interpret("plt.show()", context);
- assertEquals("", ret.message().get(0).getData());
-
+ String msg1 = new String(out.getOutputAt(0).toByteArray());
+ assertNotSame("", msg1);
+
// Now test that plot can be reshown if it is updated. It should be
// different from the previous one because it will plot the same line
// again but in a different color.
ret = python.interpret("plt.plot([1, 2, 3])", context);
+ msg1 = new String(out.getOutputAt(1).toByteArray());
ret2 = python.interpret("plt.show()", context);
- assertNotSame(ret1.message().get(0).getData(), ret2.message().get(0).getData());
+ String msg2 = new String(out.getOutputAt(1).toByteArray());
+
+ assertNotSame(msg1, msg2);
+ }
+
+
+ @Override
+ public void onUpdateAll(InterpreterOutput out) {
+
+ }
+
+ @Override
+ public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+
+ }
+
+ @Override
+ public void onUpdate(int index, InterpreterResultMessageOutput out) {
+
}
}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/287ffd50/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
----------------------------------------------------------------------
diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
index 86fb22b..f200a0a 100644
--- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
+++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterPandasSqlTest.java
@@ -21,13 +21,16 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
+import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
@@ -35,7 +38,10 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.user.AuthenticationInfo;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -53,13 +59,14 @@ import org.junit.Test;
* mvn -Dpython.test.exclude='' test -pl python -am
* </code>
*/
-public class PythonInterpreterPandasSqlTest {
+public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener {
private InterpreterGroup intpGroup;
private PythonInterpreterPandasSql sql;
private PythonInterpreter python;
private InterpreterContext context;
+ InterpreterOutput out;
@Before
public void setUp() throws Exception {
@@ -78,14 +85,27 @@ public class PythonInterpreterPandasSqlTest {
intpGroup.put("note", Arrays.asList(python, sql));
- context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
- new HashMap<String, Object>(), new GUI(),
- new AngularObjectRegistry(intpGroup.getId(), null), null,
- new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
+ out = new InterpreterOutput(this);
+
+ context = new InterpreterContext("note", "id", null, "title", "text",
+ new AuthenticationInfo(),
+ new HashMap<String, Object>(),
+ new GUI(),
+ new AngularObjectRegistry(intpGroup.getId(), null),
+ new LocalResourcePool("id"),
+ new LinkedList<InterpreterContextRunner>(),
+ out);
+
+ // to make sure python is running.
+ InterpreterResult ret = python.interpret("\n", context);
+ assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
- //important to be last step
sql.open();
- //it depends on python interpreter presence in the same group
+ }
+
+ @After
+ public void afterTest() throws IOException {
+ sql.close();
}
@Test
@@ -97,23 +117,15 @@ public class PythonInterpreterPandasSqlTest {
@Test
public void errorMessageIfDependenciesNotInstalled() {
InterpreterResult ret;
- // given
- ret = python.interpret(
- "pysqldf = lambda q: print('Can not execute SQL as Python dependency is not installed')",
- context);
- assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
-
- // when
ret = sql.interpret("SELECT * from something", context);
- // then
assertNotNull(ret);
- assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
- assertTrue(ret.message().get(0).getData().contains("dependency is not installed"));
+ assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.ERROR, ret.code());
+ assertTrue(ret.message().get(0).getData().contains("no such table: something"));
}
@Test
- public void sqlOverTestDataPrintsTable() {
+ public void sqlOverTestDataPrintsTable() throws IOException {
InterpreterResult ret;
// given
//String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34";
@@ -121,36 +133,34 @@ public class PythonInterpreterPandasSqlTest {
ret = python.interpret("import numpy as np", context);
// DataFrame df2 \w test data
ret = python.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), "+
- "'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
+ "'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
//when
ret = sql.interpret("select name, age from df2 where age < 40", context);
//then
- assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
- //assertEquals(expectedTable, ret.message()); //somehow it's same but not equal
- assertTrue(ret.message().get(0).getData().indexOf("moon\t33") > 0);
- assertTrue(ret.message().get(0).getData().indexOf("park\t34") > 0);
+ assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, out.getOutputAt(0).getType());
+ assertTrue(new String(out.getOutputAt(0).toByteArray()).indexOf("moon\t33") > 0);
+ assertTrue(new String(out.getOutputAt(0).toByteArray()).indexOf("park\t34") > 0);
assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from df2", context).code());
}
@Test
- public void badSqlSyntaxFails() {
+ public void badSqlSyntaxFails() throws IOException {
//when
InterpreterResult ret = sql.interpret("select wrong syntax", context);
//then
assertNotNull("Interpreter returned 'null'", ret);
- //System.out.println("\nInterpreter response: \n" + ret.message());
assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
- assertTrue(ret.message().get(0).getData().length() > 0);
+ assertTrue(out.toInterpreterResultMessage().size() == 0);
}
@Test
- public void showDataFrame() {
+ public void showDataFrame() throws IOException {
InterpreterResult ret;
ret = python.interpret("import pandas as pd", context);
ret = python.interpret("import numpy as np", context);
@@ -165,11 +175,25 @@ public class PythonInterpreterPandasSqlTest {
ret = python.interpret("z.show(df1, show_index=True)", context);
// then
- assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
- assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
- assertTrue(ret.message().get(0).getData().indexOf("index_name") == 0);
- assertTrue(ret.message().get(0).getData().indexOf("13") > 0);
- assertTrue(ret.message().get(0).getData().indexOf("nan") > 0);
- assertTrue(ret.message().get(0).getData().indexOf("6.7") > 0);
+ assertEquals(new String(out.getOutputAt(0).toByteArray()), InterpreterResult.Code.SUCCESS, ret.code());
+ assertEquals(new String(out.getOutputAt(0).toByteArray()), Type.TABLE, out.getOutputAt(0).getType());
+ assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("index_name"));
+ assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("nan"));
+ assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("6.7"));
+ }
+
+ @Override
+ public void onUpdateAll(InterpreterOutput out) {
+
+ }
+
+ @Override
+ public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
+
+ }
+
+ @Override
+ public void onUpdate(int index, InterpreterResultMessageOutput out) {
+
}
-}
+}
\ No newline at end of file