You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/10/18 09:33:22 UTC

[dolphinscheduler] branch dev updated: [Fix-6529] Improve the python task to solve the issue about getting the environment variables. (#6531)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8200bc1  [Fix-6529] Improve the python task to solve the issue about getting the environment variables. (#6531)
8200bc1 is described below

commit 8200bc14526364fa1908b1df82befc984b0419aa
Author: Hua Jiang <ji...@163.com>
AuthorDate: Mon Oct 18 17:33:16 2021 +0800

    [Fix-6529] Improve the python task to solve the issue about getting the environment variables. (#6531)
    
    * modify config
    
    * improve python task
    
    * improve python task
---
 .../plugin/task/python/PythonCommandExecutor.java  | 205 ---------------------
 .../plugin/task/python/PythonTask.java             |  77 ++++++--
 2 files changed, 61 insertions(+), 221 deletions(-)

diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
deleted file mode 100644
index d40f61b..0000000
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.python;
-
-import java.util.Arrays;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor;
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.io.FileUtils;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Consumer;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * python command executor
- */
-public class PythonCommandExecutor extends AbstractCommandExecutor {
-
-    /**
-     * logger
-     */
-    private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutor.class);
-
-    /**
-     * python
-     */
-    public static final String PYTHON = "python";
-
-    private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$");
-
-    /**
-     * constructor
-     *
-     * @param logHandler log handler
-     * @param taskRequest TaskRequest
-     * @param logger logger
-     */
-    public PythonCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
-                                 TaskRequest taskRequest,
-                                 Logger logger) {
-        super(logHandler, taskRequest, logger);
-    }
-
-
-    /**
-     * build command file path
-     *
-     * @return command file path
-     */
-    @Override
-    protected String buildCommandFilePath() {
-        return String.format("%s/py_%s.command", taskRequest.getExecutePath(), taskRequest.getTaskAppId());
-    }
-
-    /**
-     * create command file if not exists
-     *
-     * @param execCommand exec command
-     * @param commandFile command file
-     * @throws IOException io exception
-     */
-    @Override
-    protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
-        logger.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(), taskRequest.getExecutePath());
-
-        if (!Files.exists(Paths.get(commandFile))) {
-            logger.info("generate command file:{}", commandFile);
-
-            StringBuilder sb = new StringBuilder();
-            sb.append("#-*- encoding=utf8 -*-\n");
-
-            sb.append("\n\n");
-            sb.append(execCommand);
-            logger.info(sb.toString());
-
-            // write data to file
-            FileUtils.writeStringToFile(new File(commandFile),
-                    sb.toString(),
-                    StandardCharsets.UTF_8);
-        }
-    }
-
-    /**
-     * get the absolute path of the Python command
-     * note :
-     * common.properties
-     * PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself
-     * <p>
-     * for example :
-     * your PYTHON_HOM is /opt/python3.7/
-     * you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties
-     * dolphinscheduler.env.path file.
-     *
-     * @param envPath env path
-     * @return python home
-     */
-    private static String getPythonHome(String envPath) {
-        // BufferedReader br = null;
-        StringBuilder sb = new StringBuilder();
-        try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));) {
-            String line;
-            while ((line = br.readLine()) != null) {
-                if (line.contains(PythonConstants.PYTHON_HOME)) {
-                    sb.append(line);
-                    break;
-                }
-            }
-            String result = sb.toString();
-            if (StringUtils.isEmpty(result)) {
-                return null;
-            }
-            String[] arrs = result.split(PythonConstants.EQUAL_SIGN);
-            if (arrs.length == 2) {
-                return arrs[1];
-            }
-        } catch (IOException e) {
-            logger.error("read file failure", e);
-        }
-        return null;
-    }
-
-    /**
-     * Gets the command path to which Python can execute
-     * @return python command path
-     */
-    @Override
-    protected String commandInterpreter() {
-        String pythonHome = getPythonHome(taskRequest.getEnvFile());
-
-        if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
-            pythonHome = getPythonHomeFromEnvironmentConfig(taskRequest.getEnvironmentConfig());
-        }
-
-        return getPythonCommand(pythonHome);
-    }
-
-    /**
-     * get python command
-     *
-     * @param pythonHome python home
-     * @return python command
-     */
-    public static String getPythonCommand(String pythonHome) {
-        if (StringUtils.isEmpty(pythonHome)) {
-            return PYTHON;
-        }
-        File file = new File(pythonHome);
-        if (file.exists() && file.isFile()) {
-            return pythonHome;
-        }
-        if (PYTHON_PATH_PATTERN.matcher(pythonHome).find()) {
-            return pythonHome;
-        }
-        return Paths.get(pythonHome, "/bin/python").toString();
-    }
-
-    /**
-     * get python home from the environment config
-     *
-     * @param environmentConfig env config
-     * @return python home
-     */
-    public static String getPythonHomeFromEnvironmentConfig(String environmentConfig) {
-        String[] lines = environmentConfig.split("\n");
-
-        String pythonHomeConfig = Arrays.stream(lines).filter(line -> line.contains(PythonConstants.PYTHON_HOME)).findFirst().get();
-
-        if (StringUtils.isEmpty(pythonHomeConfig)) {
-            return null;
-        }
-        String[] arrs = pythonHomeConfig.split(PythonConstants.EQUAL_SIGN);
-        if (arrs.length == 2) {
-            return arrs[1];
-        }
-        return null;
-    }
-}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 01457da..382f177 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -18,6 +18,7 @@
 package org.apache.dolphinscheduler.plugin.task.python;
 
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
 import org.apache.dolphinscheduler.plugin.task.api.TaskException;
 import org.apache.dolphinscheduler.plugin.task.api.TaskResponse;
 import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
@@ -29,6 +30,13 @@ import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
 import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
 import org.apache.dolphinscheduler.spi.utils.JSONUtils;
 
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -43,14 +51,9 @@ public class PythonTask extends AbstractTaskExecutor {
     private PythonParameters pythonParameters;
 
     /**
-     * task dir
-     */
-    private String taskDir;
-
-    /**
-     * python command executor
+     * shell command executor
      */
-    private PythonCommandExecutor pythonCommandExecutor;
+    private ShellCommandExecutor shellCommandExecutor;
 
     private TaskRequest taskRequest;
 
@@ -63,7 +66,7 @@ public class PythonTask extends AbstractTaskExecutor {
         super(taskRequest);
         this.taskRequest = taskRequest;
 
-        this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
+        this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
                 taskRequest,
                 logger);
     }
@@ -93,13 +96,20 @@ public class PythonTask extends AbstractTaskExecutor {
     @Override
     public void handle() throws Exception {
         try {
-            // construct process
-            String command = buildCommand();
-            TaskResponse taskResponse = pythonCommandExecutor.run(command);
+            // generate the content of this python script
+            String pythonScriptContent = buildPythonScriptContent();
+            // generate the file path of this python script
+            String pythonScriptFile = buildPythonCommandFilePath();
+
+            // create this file
+            createPythonCommandFileIfNotExists(pythonScriptContent,pythonScriptFile);
+            String command = "python " + pythonScriptFile;
+
+            TaskResponse taskResponse = shellCommandExecutor.run(command);
             setExitStatusCode(taskResponse.getExitStatusCode());
             setAppIds(taskResponse.getAppIds());
             setProcessId(taskResponse.getProcessId());
-            setVarPool(pythonCommandExecutor.getVarPool());
+            setVarPool(shellCommandExecutor.getVarPool());
         } catch (Exception e) {
             logger.error("python task failure", e);
             setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
@@ -110,7 +120,7 @@ public class PythonTask extends AbstractTaskExecutor {
     @Override
     public void cancelApplication(boolean cancelApplication) throws Exception {
         // cancel process
-        pythonCommandExecutor.cancelApplication();
+        shellCommandExecutor.cancelApplication();
     }
 
     @Override
@@ -151,12 +161,48 @@ public class PythonTask extends AbstractTaskExecutor {
     }
 
     /**
-     * build command
+     * create python command file if not exists
+     *
+     * @param pythonScript exec python script
+     * @param pythonScriptFile python script file
+     * @throws IOException io exception
+     */
+    protected void createPythonCommandFileIfNotExists(String pythonScript, String pythonScriptFile) throws IOException {
+        logger.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(), taskRequest.getExecutePath());
+
+        if (!Files.exists(Paths.get(pythonScriptFile))) {
+            logger.info("generate python script file:{}", pythonScriptFile);
+
+            StringBuilder sb = new StringBuilder();
+            sb.append("#-*- encoding=utf8 -*-\n");
+
+            sb.append("\n\n");
+            sb.append(pythonScript);
+            logger.info(sb.toString());
+
+            // write data to file
+            FileUtils.writeStringToFile(new File(pythonScriptFile),
+                sb.toString(),
+                StandardCharsets.UTF_8);
+        }
+    }
+
+    /**
+     * build python command file path
+     *
+     * @return python command file path
+     */
+    protected String buildPythonCommandFilePath() {
+        return String.format("%s/py_%s.py", taskRequest.getExecutePath(), taskRequest.getTaskAppId());
+    }
+
+    /**
+     * build python script content
      *
      * @return raw python script
      * @throws Exception exception
      */
-    private String buildCommand() throws Exception {
+    private String buildPythonScriptContent() throws Exception {
         String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
 
         // replace placeholder
@@ -170,7 +216,6 @@ public class PythonTask extends AbstractTaskExecutor {
         rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
 
         logger.info("raw python script : {}", pythonParameters.getRawScript());
-        logger.info("task dir : {}", taskDir);
 
         return rawPythonScript;
     }