You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/06/21 14:38:02 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5330]. Support conda env for python interpreter in yarn mode

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new fd1286f  [ZEPPELIN-5330]. Support conda env for python interpreter in yarn mode
fd1286f is described below

commit fd1286f99165d46b4db213e4e5ad1129c672fa38
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Mon Jun 21 17:30:19 2021 +0800

    [ZEPPELIN-5330]. Support conda env for python interpreter in yarn mode
    
    ### What is this PR for?
    
    This is to run python interpreter in yarn mode and you can also customize the python runtime via conda. Check the `python.md` for more details
    
    ### What type of PR is it?
    [Feature | Documentation]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5330
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4097 from zjffdu/ZEPPELIN-5330 and squashes the following commits:
    
    b6a957fd3e [Jeff Zhang] address code review comments
    bdb4dcbbfa [Jeff Zhang] support hdfs file for
    ee8d11b8fd [Jeff Zhang] minor code refacotring
    a7dca0fd35 [Jeff Zhang] address comments
    5fc3e86543 [Jeff Zhang] Fix Ci
    e4a6c7146a [Jeff Zhang] address comments
    f7e6b13d4d [Jeff Zhang] save
    d5828d203b [Jeff Zhang] update python
    3c963b6b61 [Jeff Zhang] update
    fa36c5b997 [Jeff Zhang] update jupyter
    e25c6b3f3b [Jeff Zhang] address comment
    d27f580216 [Jeff Zhang] [ZEPPELIN-5330]. Support conda env for python interpreter in yarn mode
    
    (cherry picked from commit dfc36a4d53f01b5b9f00bbfad67dc3207d0c81c1)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 docs/interpreter/python.md                         | 96 +++++++++++++++++++++-
 .../apache/zeppelin/python/PythonInterpreter.java  |  7 +-
 .../zeppelin/jupyter/JupyterKernelInterpreter.java | 47 ++++++++++-
 .../launcher/YarnRemoteInterpreterProcess.java     | 72 +++++++++++++---
 .../launcher/SparkInterpreterLauncher.java         | 16 ++++
 5 files changed, 223 insertions(+), 15 deletions(-)

diff --git a/docs/interpreter/python.md b/docs/interpreter/python.md
index 6bb7f29..86fb1db 100644
--- a/docs/interpreter/python.md
+++ b/docs/interpreter/python.md
@@ -77,6 +77,16 @@ Zeppelin supports python language which is very popular in data analytics and ma
     IPython is only used in <code>%python.ipython</code>.
     </td>
   </tr>
+  <tr>
+    <td>zeppelin.yarn.dist.archives</td>
+    <td></td>
+    <td>Comma separated list of archives to be extracted into the working directory of interpreter. e.g. You can specify conda pack archive files via this property in python's yarn mode. It could be either files in local filesystem or files on hadoop compatible file systems</td>
+  </tr>
+  <tr>
+    <td>zeppelin.interpreter.conda.env.name</td>
+    <td></td>
+    <td>conda environment name, aka the folder name in the working directory of interpreter</td>
+  </tr>
 </table>
 
 
@@ -347,7 +357,91 @@ Python interpreter create a variable `z` which represent `ZeppelinContext` for y
   </tr>
 </table>
 
-## Python environments
+## Run Python in yarn cluster
+
+Zeppelin supports to run python interpreter in yarn cluster which means the python interpreter runs in the yarn container.
+This can achieve better multi-tenant for python interpreter especially when you already have a hadoop yarn cluster.
+
+But there's one critical problem to run python in yarn cluster: how to manage the python environment in yarn container. Because yarn cluster is a distributed cluster environemt
+which is composed many nodes, and your python interpreter can start in any node. It is not practical to manage python environment in each nodes.
+
+So in order to run python in yarn cluster, we would suggest you to use conda to manage your python environment, and Zeppelin can ship your
+codna environment to yarn container, so that each python interpreter can has its own python environment.
+
+### Step 1
+We would suggest you to use conda pack to create archives of conda environments, and ship it to yarn container. Otherwise python interpreter
+will use the python executable in PATH of yarn container.
+
+Here's one example of yml file which could be used to generate a conda environment with python 3 and some useful python libraries.
+
+* Create yml file for conda environment, write the following content into file `env_python_3.yml`
+
+```text
+name: python_3_env
+channels:
+  - conda-forge
+  - defaults
+dependencies:
+  - python=3.7 
+  - pycodestyle
+  - numpy
+  - pandas
+  - scipy
+  - grpcio
+  - protobuf
+  - pandasql
+  - ipython
+  - ipykernel
+  - jupyter_client
+  - panel
+  - pyyaml
+  - seaborn
+  - plotnine
+  - hvplot
+  - intake
+  - intake-parquet
+  - intake-xarray
+  - altair
+  - vega_datasets
+  - pyarrow
+
+```
+
+* Create conda environment via this yml file using either `conda` or `mamba`
+
+```bash
+
+conda env create -f env_python_3.yml
+```
+
+```bash
+
+mamba env create -f python_3_env
+```
+
+
+* Pack the conda environment using either `conda`
+
+```bash
+
+conda pack -n python_3
+```
+
+### Step 2
+
+Specify the following properties to enable yarn mode for python interpreter, and specify the correct python environment.
+
+```
+zeppelin.interpreter.launcher yarn
+zeppelin.yarn.dist.archives /home/hadoop/python_3.tar.gz#environment
+zeppelin.interpreter.conda.env.name environment
+```
+
+`zeppelin.yarn.dist.archives` is the python conda environment tar which is created in step 1.
+This tar will be shipped to yarn container and untar in the working directory of yarn container.
+`environment` in `/home/hadoop/python_3.tar.gz#environment` is the folder name after untar. This folder name should be the same as `zeppelin.interpreter.conda.env.name`.
+
+## Python environments (used for non-yarn mode)
 
 ### Default
 By default, PythonInterpreter will use python command defined in `zeppelin.python` property to run python process.
diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
index e403a59..57ee9ac 100644
--- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
+++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java
@@ -223,9 +223,12 @@ public class PythonInterpreter extends Interpreter {
 
   // Run python script
   // Choose python in the order of
-  // condaPythonExec > zeppelin.python
+  // {conda.env.name}/bin/python > condaPythonExec > zeppelin.python
   protected String getPythonExec() {
-    if (condaPythonExec != null) {
+    String condaEnv = getProperty("zeppelin.interpreter.conda.env.name");
+    if (StringUtils.isNotBlank(condaEnv)) {
+      return condaEnv + "/bin/python";
+    } else if (condaPythonExec != null) {
       return condaPythonExec;
     } else {
       return getProperty("zeppelin.python", "python");
diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
index 386a69b..6c2fdc5 100644
--- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
+++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelInterpreter.java
@@ -19,6 +19,8 @@ package org.apache.zeppelin.jupyter;
 
 import io.grpc.ManagedChannelBuilder;
 import org.apache.commons.exec.CommandLine;
+import org.apache.commons.exec.DefaultExecutor;
+import org.apache.commons.exec.PumpStreamHandler;
 import org.apache.commons.exec.environment.EnvironmentUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
@@ -45,8 +47,10 @@ import org.apache.zeppelin.interpreter.util.ProcessLauncher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
@@ -73,7 +77,8 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
   // working directory of jupyter kernel
   protected File kernelWorkDir;
   // python executable file for launching the jupyter kernel
-  private String pythonExecutable;
+  protected String pythonExecutable;
+  protected String condaEnv;
   private int kernelLaunchTimeout;
 
   private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
@@ -110,7 +115,14 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
         // JupyterKernelInterpreter might already been opened
         return;
       }
-      pythonExecutable = getProperty("zeppelin.python", "python");
+
+      String envName = getProperty("zeppelin.interpreter.conda.env.name");
+      if (StringUtils.isNotBlank(envName)) {
+        pythonExecutable = activateCondaEnv(envName);
+      } else {
+        pythonExecutable = getProperty("zeppelin.python", "python");
+      }
+
       LOGGER.info("Python Exec: {}", pythonExecutable);
       String checkPrerequisiteResult = checkKernelPrerequisite(pythonExecutable);
       if (!StringUtils.isEmpty(checkPrerequisiteResult)) {
@@ -183,8 +195,39 @@ public class JupyterKernelInterpreter extends AbstractInterpreter {
     return "";
   }
 
+  private String activateCondaEnv(String envName) throws IOException {
+    LOGGER.info("Activating conda env: {}", envName);
+    ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+    PumpStreamHandler psh = new PumpStreamHandler(stdout);
+
+    if (!new File(envName).exists()) {
+      throw new IOException("Fail to activating conda env because no environment folder: " +
+              envName);
+    }
+    File scriptFile = Files.createTempFile("zeppelin_jupyter_kernel_", ".sh").toFile();
+    try (FileWriter writer = new FileWriter(scriptFile)) {
+      IOUtils.write(String.format("chmod 777 -R %s \nsource %s/bin/activate \nconda-unpack",
+              envName, envName),
+              writer);
+    }
+    scriptFile.setExecutable(true, false);
+    scriptFile.setReadable(true, false);
+    CommandLine cmd = new CommandLine(scriptFile.getAbsolutePath());
+    DefaultExecutor executor = new DefaultExecutor();
+    executor.setStreamHandler(psh);
+    int exitCode = executor.execute(cmd);
+    if (exitCode != 0) {
+      throw new IOException("Fail to activate conda env, " + stdout.toString());
+    } else {
+      LOGGER.info("Activate conda env successfully");
+      this.condaEnv = envName;
+      return envName + "/bin/python";
+    }
+  }
+
   private void launchJupyterKernel(int kernelPort)
           throws IOException {
+
     LOGGER.info("Launching Jupyter Kernel at port: {}", kernelPort);
     // copy the python scripts to a temp directory, then launch jupyter kernel in that folder
     this.kernelWorkDir = Files.createTempDirectory(
diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
index f7037e4..b7ad5b5 100644
--- a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
 
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.io.FileUtils;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,7 +29,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -57,6 +57,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -101,28 +103,30 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
           Map<String, String> envs,
           int connectTimeout,
           int connectionPoolSize) {
-    super(connectTimeout, connectionPoolSize, launchContext.getIntpEventServerHost(), launchContext.getIntpEventServerPort());
+    super(connectTimeout,
+            connectionPoolSize,
+            launchContext.getIntpEventServerHost(),
+            launchContext.getIntpEventServerPort());
     this.zConf = ZeppelinConfiguration.create();
     this.launchContext = launchContext;
     this.properties = properties;
     this.envs = envs;
-
-    yarnClient = YarnClient.createYarnClient();
     this.hadoopConf = new YarnConfiguration();
-
     // Add core-site.xml and yarn-site.xml. This is for integration test where using MiniHadoopCluster.
     if (properties.containsKey("HADOOP_CONF_DIR") &&
-            !org.apache.commons.lang3.StringUtils.isBlank(properties.getProperty("HADOOP_CONF_DIR"))) {
+            !StringUtils.isBlank(properties.getProperty("HADOOP_CONF_DIR"))) {
       File hadoopConfDir = new File(properties.getProperty("HADOOP_CONF_DIR"));
       if (hadoopConfDir.exists() && hadoopConfDir.isDirectory()) {
         File coreSite = new File(hadoopConfDir, "core-site.xml");
         try {
+          LOGGER.info("Adding resource: {}", coreSite.getAbsolutePath());
           this.hadoopConf.addResource(coreSite.toURI().toURL());
         } catch (MalformedURLException e) {
           LOGGER.warn("Fail to add core-site.xml: " + coreSite.getAbsolutePath(), e);
         }
         File yarnSite = new File(hadoopConfDir, "yarn-site.xml");
         try {
+          LOGGER.info("Adding resource: {}", yarnSite.getAbsolutePath());
           this.hadoopConf.addResource(yarnSite.toURI().toURL());
         } catch (MalformedURLException e) {
           LOGGER.warn("Fail to add yarn-site.xml: " + yarnSite.getAbsolutePath(), e);
@@ -133,6 +137,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
       }
     }
 
+    yarnClient = YarnClient.createYarnClient();
     yarnClient.init(this.hadoopConf);
     yarnClient.start();
     try {
@@ -228,7 +233,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
     setQueue(appContext);
     appContext.setApplicationId(appId);
     setApplicationName(appContext);
-    appContext.setApplicationType("ZEPPELIN INTERPRETER");
+    appContext.setApplicationType("Zeppelin Interpreter");
     appContext.setMaxAppAttempts(1);
 
     ContainerLaunchContext amContainer = setUpAMLaunchContext();
@@ -242,12 +247,14 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
 
     // Set the resources to localize
     this.stagingDir = new Path(fs.getHomeDirectory() + "/.zeppelinStaging", appId.toString());
+    LOGGER.info("Use staging directory: {}", this.stagingDir);
     Map<String, LocalResource> localResources = new HashMap<>();
 
     File interpreterZip = createInterpreterZip();
     Path srcPath = localFs.makeQualified(new Path(interpreterZip.toURI()));
     Path destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
     addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "zeppelin");
+    LOGGER.info("Add zeppelin archive: {}", destPath);
     FileUtils.forceDelete(interpreterZip);
 
     // TODO(zjffdu) Should not add interpreter specific logic here.
@@ -259,13 +266,48 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
       FileUtils.forceDelete(flinkZip);
 
       String hiveConfDir = launchContext.getProperties().getProperty("HIVE_CONF_DIR");
-      if (!org.apache.commons.lang3.StringUtils.isBlank(hiveConfDir)) {
+      if (!StringUtils.isBlank(hiveConfDir)) {
         File hiveConfZipFile = createHiveConfZip(new File(hiveConfDir));
         srcPath = localFs.makeQualified(new Path(hiveConfZipFile.toURI()));
         destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
         addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, "hive_conf");
       }
     }
+
+    String yarnDistArchives = launchContext.getProperties().getProperty("zeppelin.yarn.dist.archives");
+    if (StringUtils.isNotBlank(yarnDistArchives)) {
+      for (String distArchive : yarnDistArchives.split(",")) {
+        URI distArchiveURI = null;
+        try {
+          distArchiveURI = new URI(distArchive);
+        } catch (URISyntaxException e) {
+          throw new IOException("Invalid uri: " + distArchive, e);
+        }
+        if ("file".equals(distArchiveURI.getScheme())) {
+          // zeppelin.yarn.dist.archives is local file
+          srcPath = localFs.makeQualified(new Path(distArchiveURI));
+          destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
+        } else {
+          // zeppelin.yarn.dist.archives is files on any hadoop compatible file system
+          destPath = new Path(removeFragment(distArchive));
+        }
+        String linkName = srcPath.getName();
+        if (distArchiveURI.getFragment() != null) {
+          linkName = distArchiveURI.getFragment();
+        }
+        addResource(fs, destPath, localResources, LocalResourceType.ARCHIVE, linkName);
+      }
+    }
+    String yarnDistFiles = launchContext.getProperties().getProperty("zeppelin.yarn.dist.files");
+    if (StringUtils.isNotBlank(yarnDistFiles)) {
+      for (String localFile : yarnDistFiles.split(",")) {
+        srcPath = localFs.makeQualified(new Path(localFile));
+        destPath = copyFileToRemote(stagingDir, srcPath, (short) 1);
+        addResource(fs, destPath, localResources, LocalResourceType.FILE, srcPath.getName());
+        LOGGER.info("Add dist file: {}", destPath);
+      }
+    }
+
     amContainer.setLocalResources(localResources);
 
     // Setup the command to run the AM
@@ -316,6 +358,15 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
     return amContainer;
   }
 
+  private String removeFragment(String path) {
+    int pos = path.lastIndexOf("#");
+    if (pos != -1) {
+      return path.substring(0, pos);
+    } else {
+      return path;
+    }
+  }
+
   /**
    * Populate the classpath entry in the given environment map with any application
    * classpath specified through the Hadoop and Yarn configurations.
@@ -324,7 +375,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
     List<String> yarnClassPath = Lists.newArrayList(getYarnAppClasspath());
     List<String> mrClassPath = Lists.newArrayList(getMRAppClasspath());
     yarnClassPath.addAll(mrClassPath);
-    LOGGER.info("Adding hadoop classpath: " + org.apache.commons.lang3.StringUtils.join(yarnClassPath, ":"));
+    LOGGER.info("Adding hadoop classpath: {}", StringUtils.join(yarnClassPath, ":"));
     for (String path : yarnClassPath) {
       String newValue = path;
       if (envs.containsKey(ApplicationConstants.Environment.CLASSPATH.name())) {
@@ -361,7 +412,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
   }
 
   private String[] getDefaultMRApplicationClasspath() {
-    return StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
+    return org.apache.hadoop.util.StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
   }
 
   private void setResources(ApplicationSubmissionContext appContext) {
@@ -534,6 +585,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
           LocalResourceType resourceType,
           String link) throws IOException {
 
+    LOGGER.info("Add resource: {}, type: {}, link: {}", destPath, resourceType, link);
     FileStatus destStatus = fs.getFileStatus(destPath);
     LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
     amJarRsrc.setType(resourceType);
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index e87ef81..932cf45 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -88,6 +88,15 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
 
     setupPropertiesForPySpark(sparkProperties);
     setupPropertiesForSparkR(sparkProperties);
+
+    String condaEnvName = context.getProperties().getProperty("zeppelin.interpreter.conda.env.name");
+    if (StringUtils.isNotBlank(condaEnvName)) {
+      if (!isYarnCluster()) {
+        throw new IOException("zeppelin.interpreter.conda.env.name only works for yarn-cluster mode");
+      }
+      sparkProperties.setProperty("spark.pyspark.python", condaEnvName + "/bin/python");
+    }
+
     if (isYarnMode() && getDeployMode().equals("cluster")) {
       env.put("ZEPPELIN_SPARK_YARN_CLUSTER", "true");
       sparkProperties.setProperty("spark.yarn.submit.waitAppCompletion", "false");
@@ -397,4 +406,11 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher {
     return getSparkMaster().startsWith("yarn");
   }
 
+  private boolean isYarnCluster() {
+    return isYarnMode() && "cluster".equalsIgnoreCase(getDeployMode());
+  }
+
+  private boolean isYarnClient() {
+    return isYarnMode() && "client".equalsIgnoreCase(getDeployMode());
+  }
 }