You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2020/12/01 14:35:16 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5144] Jupyter conda
python packages
This is an automated email from the ASF dual-hosted git repository.
pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 2735041 [ZEPPELIN-5144] Jupyter conda python packages
2735041 is described below
commit 2735041ad667c72003e68edd2e89b46d11613dd1
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Wed Nov 25 17:21:44 2020 +0100
[ZEPPELIN-5144] Jupyter conda python packages
### What is this PR for?
This PR includes:
- checkKernelPrerequisite does not fail if the Python packages are installed with conda
- Fixes the following Python bug with an additional if check
```
ERROR [2020-11-24 10:50:23,995] ({grpc-default-executor-2} JupyterKernelClient.java[onError]:231) - Fail to call IPython grpc
io.grpc.StatusRuntimeException: UNKNOWN: Exception iterating responses: 'payload'
```
### What type of PR is it?
- Bug Fix
### Todos
* [x] - Test with CI
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5144
### How should this be tested?
* GitHub-Action: https://github.com/Reamer/zeppelin/runs/1479335046
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Philipp Dallig <ph...@gmail.com>
Closes #3982 from Reamer/jupyter_conda and squashes the following commits:
8cf446429 [Philipp Dallig] Check if payload is available
2f6838a6f [Philipp Dallig] Allow installation of Python requirements with conda
16b52c2f6 [Philipp Dallig] Some style changes
(cherry picked from commit e5b6850667ac0f67bef9e1d0d410f37ded2e01ed)
Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
.../java/org/apache/zeppelin/r/IRInterpreter.java | 19 ++++---------
.../org/apache/zeppelin/r/ShinyInterpreter.java | 2 +-
.../zeppelin/jupyter/JupyterKernelClient.java | 6 ++---
.../zeppelin/jupyter/JupyterKernelInterpreter.java | 31 ++++++++++++++--------
.../main/resources/grpc/jupyter/kernel_server.py | 7 ++---
5 files changed, 33 insertions(+), 32 deletions(-)
diff --git a/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java b/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java
index 2b6051b..0dcb836 100644
--- a/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java
+++ b/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java
@@ -36,6 +36,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.Properties;
@@ -126,14 +127,14 @@ public class IRInterpreter extends JupyterKernelInterpreter {
String timeout = getProperty("spark.r.backendConnectionTimeout", "6000");
InputStream input =
getClass().getClassLoader().getResourceAsStream("R/zeppelin_isparkr.R");
- String code = IOUtils.toString(input)
+ String code = IOUtils.toString(input, StandardCharsets.UTF_8)
.replace("${Port}", sparkRBackend.port() + "")
.replace("${version}", sparkVersion() + "")
.replace("${libPath}", "\"" + SparkRUtils.getSparkRLib(isSparkSupported()) + "\"")
.replace("${timeout}", timeout)
.replace("${isSparkSupported}", "\"" + isSparkSupported() + "\"")
.replace("${authSecret}", "\"" + sparkRBackend.socketSecret() + "\"");
- LOGGER.debug("Init IRKernel via script:\n" + code);
+ LOGGER.debug("Init IRKernel via script:\n{}", code);
ExecuteResponse response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
.setCode(code).build());
if (response.getStatus() != ExecuteStatus.SUCCESS) {
@@ -167,22 +168,12 @@ public class IRInterpreter extends JupyterKernelInterpreter {
public InterpreterResult shinyServer(String st,
InterpreterContext context) throws InterpreterException {
File serverFile = new File(shinyAppFolder, "server.R");
- FileWriter writer = null;
- try {
- writer = new FileWriter(serverFile);
+ try (FileWriter writer = new FileWriter(serverFile);){
IOUtils.copy(new StringReader(st), writer);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Write server.R to "
+ shinyAppFolder.getAbsolutePath() + " successfully.");
} catch (IOException e) {
throw new InterpreterException("Fail to write shiny file server.R", e);
- } finally {
- if (writer != null) {
- try {
- writer.close();
- } catch (IOException e) {
- throw new InterpreterException(e);
- }
- }
}
}
@@ -198,7 +189,7 @@ public class IRInterpreter extends JupyterKernelInterpreter {
builder.append("runApp(appDir='" + shinyAppFolder.getAbsolutePath() + "', " +
"port=" + port + ", host='" + host + "', launch.browser=FALSE)");
// shiny app will launch and block there until user cancel the paragraph.
- LOGGER.info("Run shiny app code: " + builder.toString());
+ LOGGER.info("Run shiny app code: {}", builder);
return internalInterpret(builder.toString(), context);
} finally {
getKernelProcessLauncher().setRedirectedContext(null);
diff --git a/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java b/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
index 032c522..499ef4a 100644
--- a/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
+++ b/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
@@ -69,7 +69,7 @@ public class ShinyInterpreter extends AbstractInterpreter {
@Override
public void close() throws InterpreterException {
for (Map.Entry<String,IRInterpreter> entry : shinyIRInterpreters.entrySet()) {
- LOGGER.info("Closing IRInterpreter: " + entry.getKey());
+ LOGGER.info("Closing IRInterpreter: {}", entry.getKey());
// Stop shiny app first otherwise the R process can not be terminated.
entry.getValue().cancel(InterpreterContext.get());
entry.getValue().close();
diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
index 1065c41..51b3b9a 100644
--- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
+++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
@@ -55,7 +55,7 @@ public class JupyterKernelClient {
private static final Logger LOGGER = LoggerFactory.getLogger(JupyterKernelClient.class.getName());
// used for matching shiny url
- private static Pattern ShinyListeningPattern =
+ private static final Pattern SHINY_LISTENING_PATTERN =
Pattern.compile(".*Listening on (http:\\S*).*", Pattern.DOTALL);
private final ManagedChannel channel;
@@ -112,10 +112,10 @@ public class JupyterKernelClient {
if (intpClassName != null &&
(intpClassName.equals("org.apache.zeppelin.r.ShinyInterpreter") ||
intpClassName.equals("org.apache.zeppelin.spark.SparkShinyInterpreter"))) {
- Matcher matcher = ShinyListeningPattern.matcher(response);
+ Matcher matcher = SHINY_LISTENING_PATTERN.matcher(response);
if (matcher.matches()) {
String url = matcher.group(1);
- LOGGER.info("Matching shiny app url: " + url);
+ LOGGER.info("Matching shiny app url: {}", url);
context.out.clear();
String defaultHeight = properties.getProperty("zeppelin.R.shiny.iframe_height", "500px");
String height = context.getLocalProperties().getOrDefault("height", defaultHeight);
diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
index 98cf4bd..1269bda 100644
--- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
+++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
@@ -49,6 +49,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
@@ -110,7 +111,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
return;
}
pythonExecutable = getProperty("zeppelin.python", "python");
- LOGGER.info("Python Exec: " + pythonExecutable);
+ LOGGER.info("Python Exec: {}", pythonExecutable);
String checkPrerequisiteResult = checkKernelPrerequisite(pythonExecutable);
if (!StringUtils.isEmpty(checkPrerequisiteResult)) {
throw new InterpreterException("Kernel prerequisite is not meet: " +
@@ -120,11 +121,11 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
getProperty("zeppelin.jupyter.kernel.launch.timeout", "30000"));
this.z = buildZeppelinContext();
int kernelPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
- int message_size = Integer.parseInt(getProperty("zeppelin.jupyter.kernel.grpc.message_size",
+ int messageSize = Integer.parseInt(getProperty("zeppelin.jupyter.kernel.grpc.message_size",
32 * 1024 * 1024 + ""));
jupyterKernelClient = new JupyterKernelClient(ManagedChannelBuilder.forAddress("127.0.0.1",
- kernelPort).usePlaintext(true).maxInboundMessageSize(message_size),
+ kernelPort).usePlaintext(true).maxInboundMessageSize(messageSize),
getProperties(), kernel);
launchJupyterKernel(kernelPort);
} catch (Exception e) {
@@ -153,17 +154,24 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
int ret = proc.waitFor();
if (ret != 0) {
try (FileInputStream in = new FileInputStream(stderrFile)) {
- return "Fail to run pip freeze.\n" + IOUtils.toString(in);
+ return "Fail to run pip freeze.\n" + IOUtils.toString(in, StandardCharsets.UTF_8);
}
}
try (FileInputStream in = new FileInputStream(stdoutFile)) {
- String freezeOutput = IOUtils.toString(in);
+ String freezeOutput = IOUtils.toString(in, StandardCharsets.UTF_8);
for (String packageName : getRequiredPackages()) {
- if (!freezeOutput.contains(packageName + "=")) {
+ /**
+ * Example line, if the Python package is installed with pip:
+ * grpcio==1.18.0
+ * Example line, if the Python package is installed with conda:
+ * grpcio @ file:///home/conda/feedstock_root/build_artifacts/grpcio_1604365513151/work
+ */
+ if (!freezeOutput.contains(packageName + "=") &&
+ !freezeOutput.contains(packageName + " ")) {
return packageName + " is not installed.";
}
}
- LOGGER.info("Prerequisite for kernel " + getKernelName() + " is met");
+ LOGGER.info("Prerequisite for kernel {} is met", getKernelName());
}
} catch (Exception e) {
LOGGER.warn("Fail to checkKernelPrerequisite", e);
@@ -177,7 +185,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
private void launchJupyterKernel(int kernelPort)
throws IOException {
- LOGGER.info("Launching Jupyter Kernel at port: " + kernelPort);
+ LOGGER.info("Launching Jupyter Kernel at port: {}", kernelPort);
// copy the python scripts to a temp directory, then launch jupyter kernel in that folder
this.kernelWorkDir = Files.createTempDirectory(
"zeppelin_jupyter_kernel_" + getKernelName()).toFile();
@@ -191,7 +199,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
CommandLine cmd = CommandLine.parse(pythonExecutable);
cmd.addArgument(kernelWorkDir.getAbsolutePath() + "/kernel_server.py");
cmd.addArgument(getKernelName());
- cmd.addArgument(kernelPort + "");
+ cmd.addArgument(String.valueOf(kernelPort));
Map<String, String> envs = setupKernelEnv();
jupyterKernelProcessLauncher = new JupyterKernelProcessLauncher(cmd, envs);
@@ -289,7 +297,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
- LOGGER.debug("Call completion for: " + buf + ", cursor: " + cursor);
+ LOGGER.debug("Call completion for: {}, cursor: {}", buf, cursor);
List<InterpreterCompletion> completions = new ArrayList<>();
CompletionResponse response =
jupyterKernelClient.complete(
@@ -301,12 +309,13 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
if (lastIndexOfDot != -1) {
match = match.substring(lastIndexOfDot + 1);
}
- LOGGER.debug("Candidate completion: " + match);
+ LOGGER.debug("Candidate completion: {}", match);
completions.add(new InterpreterCompletion(match, match, ""));
}
return completions;
}
+ @Override
public ZeppelinContext getZeppelinContext() {
return z;
}
diff --git a/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py b/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py
index 5643fab..0c7a0d1 100644
--- a/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py
+++ b/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py
@@ -132,9 +132,10 @@ class KernelServer(kernel_pb2_grpc.JupyterKernelServicer):
output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
if payload_reply:
result = []
- for payload in payload_reply[0]['content']['payload']:
- if payload['data']['text/plain']:
- result.append(payload['data']['text/plain'])
+ if 'payload' in payload_reply[0]['content']:
+ for payload in payload_reply[0]['content']['payload']:
+ if payload['data']['text/plain']:
+ result.append(payload['data']['text/plain'])
if result:
yield kernel_pb2.ExecuteResponse(status=kernel_pb2.SUCCESS,
type=kernel_pb2.TEXT,