You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by pd...@apache.org on 2020/12/01 14:35:16 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5144] Jupyter conda python packages

This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 2735041  [ZEPPELIN-5144] Jupyter conda python packages
2735041 is described below

commit 2735041ad667c72003e68edd2e89b46d11613dd1
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Wed Nov 25 17:21:44 2020 +0100

    [ZEPPELIN-5144] Jupyter conda python packages
    
    ### What is this PR for?
    This PR includes:
     - checkKernelPrerequisite does not fail if the Python packages are installed with conda
     - Fixes the following Python bug with an additional if check
    ```
    ERROR [2020-11-24 10:50:23,995] ({grpc-default-executor-2} JupyterKernelClient.java[onError]:231) - Fail to call IPython grpc
    io.grpc.StatusRuntimeException: UNKNOWN: Exception iterating responses: 'payload'
    ```
    
    ### What type of PR is it?
     - Bug Fix
    
    ### Todos
    * [x] - Test with CI
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5144
    
    ### How should this be tested?
    * GitHub-Action: https://github.com/Reamer/zeppelin/runs/1479335046
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <ph...@gmail.com>
    
    Closes #3982 from Reamer/jupyter_conda and squashes the following commits:
    
    8cf446429 [Philipp Dallig] Check if payload is available
    2f6838a6f [Philipp Dallig] Allow installation of Python requirements with conda
    16b52c2f6 [Philipp Dallig] Some style changes
    
    (cherry picked from commit e5b6850667ac0f67bef9e1d0d410f37ded2e01ed)
    Signed-off-by: Philipp Dallig <ph...@gmail.com>
---
 .../java/org/apache/zeppelin/r/IRInterpreter.java  | 19 ++++---------
 .../org/apache/zeppelin/r/ShinyInterpreter.java    |  2 +-
 .../zeppelin/jupyter/JupyterKernelClient.java      |  6 ++---
 .../zeppelin/jupyter/JupyterKernelInterpreter.java | 31 ++++++++++++++--------
 .../main/resources/grpc/jupyter/kernel_server.py   |  7 ++---
 5 files changed, 33 insertions(+), 32 deletions(-)

diff --git a/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java b/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java
index 2b6051b..0dcb836 100644
--- a/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java
+++ b/rlang/src/main/java/org/apache/zeppelin/r/IRInterpreter.java
@@ -36,6 +36,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.Properties;
 
@@ -126,14 +127,14 @@ public class IRInterpreter extends JupyterKernelInterpreter {
     String timeout = getProperty("spark.r.backendConnectionTimeout", "6000");
     InputStream input =
             getClass().getClassLoader().getResourceAsStream("R/zeppelin_isparkr.R");
-    String code = IOUtils.toString(input)
+    String code = IOUtils.toString(input, StandardCharsets.UTF_8)
             .replace("${Port}", sparkRBackend.port() + "")
             .replace("${version}", sparkVersion() + "")
             .replace("${libPath}", "\"" + SparkRUtils.getSparkRLib(isSparkSupported()) + "\"")
             .replace("${timeout}", timeout)
             .replace("${isSparkSupported}", "\"" + isSparkSupported() + "\"")
             .replace("${authSecret}", "\"" + sparkRBackend.socketSecret() + "\"");
-    LOGGER.debug("Init IRKernel via script:\n" + code);
+    LOGGER.debug("Init IRKernel via script:\n{}", code);
     ExecuteResponse response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
             .setCode(code).build());
     if (response.getStatus() != ExecuteStatus.SUCCESS) {
@@ -167,22 +168,12 @@ public class IRInterpreter extends JupyterKernelInterpreter {
   public InterpreterResult shinyServer(String st,
                                        InterpreterContext context) throws InterpreterException {
     File serverFile = new File(shinyAppFolder, "server.R");
-    FileWriter writer = null;
-    try {
-      writer = new FileWriter(serverFile);
+    try (FileWriter writer = new FileWriter(serverFile);){
       IOUtils.copy(new StringReader(st), writer);
       return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Write server.R to "
               + shinyAppFolder.getAbsolutePath() + " successfully.");
     } catch (IOException e) {
       throw new InterpreterException("Fail to write shiny file server.R", e);
-    } finally {
-      if (writer != null) {
-        try {
-          writer.close();
-        } catch (IOException e) {
-          throw new InterpreterException(e);
-        }
-      }
     }
   }
 
@@ -198,7 +189,7 @@ public class IRInterpreter extends JupyterKernelInterpreter {
       builder.append("runApp(appDir='" + shinyAppFolder.getAbsolutePath() + "', " +
               "port=" + port + ", host='" + host + "', launch.browser=FALSE)");
       // shiny app will launch and block there until user cancel the paragraph.
-      LOGGER.info("Run shiny app code: " + builder.toString());
+      LOGGER.info("Run shiny app code: {}", builder);
       return internalInterpret(builder.toString(), context);
     } finally {
       getKernelProcessLauncher().setRedirectedContext(null);
diff --git a/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java b/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
index 032c522..499ef4a 100644
--- a/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
+++ b/rlang/src/main/java/org/apache/zeppelin/r/ShinyInterpreter.java
@@ -69,7 +69,7 @@ public class ShinyInterpreter extends AbstractInterpreter {
   @Override
   public void close() throws InterpreterException {
     for (Map.Entry<String,IRInterpreter> entry : shinyIRInterpreters.entrySet()) {
-      LOGGER.info("Closing IRInterpreter: " + entry.getKey());
+      LOGGER.info("Closing IRInterpreter: {}", entry.getKey());
       // Stop shiny app first otherwise the R process can not be terminated.
       entry.getValue().cancel(InterpreterContext.get());
       entry.getValue().close();
diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
index 1065c41..51b3b9a 100644
--- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
+++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java
@@ -55,7 +55,7 @@ public class JupyterKernelClient {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(JupyterKernelClient.class.getName());
   // used for matching shiny url
-  private static Pattern ShinyListeningPattern =
+  private static final Pattern SHINY_LISTENING_PATTERN =
           Pattern.compile(".*Listening on (http:\\S*).*", Pattern.DOTALL);
 
   private final ManagedChannel channel;
@@ -112,10 +112,10 @@ public class JupyterKernelClient {
     if (intpClassName != null &&
             (intpClassName.equals("org.apache.zeppelin.r.ShinyInterpreter") ||
                     intpClassName.equals("org.apache.zeppelin.spark.SparkShinyInterpreter"))) {
-      Matcher matcher = ShinyListeningPattern.matcher(response);
+      Matcher matcher = SHINY_LISTENING_PATTERN.matcher(response);
       if (matcher.matches()) {
         String url = matcher.group(1);
-        LOGGER.info("Matching shiny app url: " + url);
+        LOGGER.info("Matching shiny app url: {}", url);
         context.out.clear();
         String defaultHeight = properties.getProperty("zeppelin.R.shiny.iframe_height", "500px");
         String height = context.getLocalProperties().getOrDefault("height", defaultHeight);
diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
index 98cf4bd..1269bda 100644
--- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
+++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
@@ -49,6 +49,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.List;
@@ -110,7 +111,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
         return;
       }
       pythonExecutable = getProperty("zeppelin.python", "python");
-      LOGGER.info("Python Exec: " + pythonExecutable);
+      LOGGER.info("Python Exec: {}", pythonExecutable);
       String checkPrerequisiteResult = checkKernelPrerequisite(pythonExecutable);
       if (!StringUtils.isEmpty(checkPrerequisiteResult)) {
         throw new InterpreterException("Kernel prerequisite is not meet: " +
@@ -120,11 +121,11 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
               getProperty("zeppelin.jupyter.kernel.launch.timeout", "30000"));
       this.z = buildZeppelinContext();
       int kernelPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
-      int message_size = Integer.parseInt(getProperty("zeppelin.jupyter.kernel.grpc.message_size",
+      int messageSize = Integer.parseInt(getProperty("zeppelin.jupyter.kernel.grpc.message_size",
               32 * 1024 * 1024 + ""));
 
       jupyterKernelClient = new JupyterKernelClient(ManagedChannelBuilder.forAddress("127.0.0.1",
-              kernelPort).usePlaintext(true).maxInboundMessageSize(message_size),
+              kernelPort).usePlaintext(true).maxInboundMessageSize(messageSize),
               getProperties(), kernel);
       launchJupyterKernel(kernelPort);
     } catch (Exception e) {
@@ -153,17 +154,24 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
       int ret = proc.waitFor();
       if (ret != 0) {
         try (FileInputStream in = new FileInputStream(stderrFile)) {
-          return "Fail to run pip freeze.\n" + IOUtils.toString(in);
+          return "Fail to run pip freeze.\n" + IOUtils.toString(in, StandardCharsets.UTF_8);
         }
       }
       try (FileInputStream in = new FileInputStream(stdoutFile)) {
-        String freezeOutput = IOUtils.toString(in);
+        String freezeOutput = IOUtils.toString(in, StandardCharsets.UTF_8);
         for (String packageName : getRequiredPackages()) {
-          if (!freezeOutput.contains(packageName + "=")) {
+          /**
+           * Example line, if the Python package is installed with pip:
+           *  grpcio==1.18.0
+           * Example line, if the Python package is installed with conda:
+           *  grpcio @ file:///home/conda/feedstock_root/build_artifacts/grpcio_1604365513151/work
+           */
+          if (!freezeOutput.contains(packageName + "=") &&
+              !freezeOutput.contains(packageName + " ")) {
             return packageName + " is not installed.";
           }
         }
-        LOGGER.info("Prerequisite for kernel " + getKernelName() + " is met");
+        LOGGER.info("Prerequisite for kernel {} is met", getKernelName());
       }
     } catch (Exception e) {
       LOGGER.warn("Fail to checkKernelPrerequisite", e);
@@ -177,7 +185,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
 
   private void launchJupyterKernel(int kernelPort)
           throws IOException {
-    LOGGER.info("Launching Jupyter Kernel at port: " + kernelPort);
+    LOGGER.info("Launching Jupyter Kernel at port: {}", kernelPort);
     // copy the python scripts to a temp directory, then launch jupyter kernel in that folder
     this.kernelWorkDir = Files.createTempDirectory(
             "zeppelin_jupyter_kernel_" + getKernelName()).toFile();
@@ -191,7 +199,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
     CommandLine cmd = CommandLine.parse(pythonExecutable);
     cmd.addArgument(kernelWorkDir.getAbsolutePath() + "/kernel_server.py");
     cmd.addArgument(getKernelName());
-    cmd.addArgument(kernelPort + "");
+    cmd.addArgument(String.valueOf(kernelPort));
 
     Map<String, String> envs = setupKernelEnv();
     jupyterKernelProcessLauncher = new JupyterKernelProcessLauncher(cmd, envs);
@@ -289,7 +297,7 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
   @Override
   public List<InterpreterCompletion> completion(String buf, int cursor,
                                                 InterpreterContext interpreterContext) {
-    LOGGER.debug("Call completion for: " + buf + ", cursor: " + cursor);
+    LOGGER.debug("Call completion for: {}, cursor: {}", buf, cursor);
     List<InterpreterCompletion> completions = new ArrayList<>();
     CompletionResponse response =
             jupyterKernelClient.complete(
@@ -301,12 +309,13 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
       if (lastIndexOfDot != -1) {
         match = match.substring(lastIndexOfDot + 1);
       }
-      LOGGER.debug("Candidate completion: " + match);
+      LOGGER.debug("Candidate completion: {}", match);
       completions.add(new InterpreterCompletion(match, match, ""));
     }
     return completions;
   }
 
+  @Override
   public ZeppelinContext getZeppelinContext() {
     return z;
   }
diff --git a/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py b/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py
index 5643fab..0c7a0d1 100644
--- a/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py
+++ b/zeppelin-jupyter-interpreter/src/main/resources/grpc/jupyter/kernel_server.py
@@ -132,9 +132,10 @@ class KernelServer(kernel_pb2_grpc.JupyterKernelServicer):
                                                   output="Ipython kernel has been stopped. Please check logs. It might be because of an out of memory issue.")
         if payload_reply:
             result = []
-            for payload in payload_reply[0]['content']['payload']:
-                if payload['data']['text/plain']:
-                    result.append(payload['data']['text/plain'])
+            if 'payload' in payload_reply[0]['content']:
+                for payload in payload_reply[0]['content']['payload']:
+                    if payload['data']['text/plain']:
+                        result.append(payload['data']['text/plain'])
             if result:
                 yield kernel_pb2.ExecuteResponse(status=kernel_pb2.SUCCESS,
                                                   type=kernel_pb2.TEXT,