You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/10/18 09:33:22 UTC
[dolphinscheduler] branch dev updated: [Fix-6529] Improve the
python task to solve the issue about getting the environment variables.
(#6531)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 8200bc1 [Fix-6529] Improve the python task to solve the issue about getting the environment variables. (#6531)
8200bc1 is described below
commit 8200bc14526364fa1908b1df82befc984b0419aa
Author: Hua Jiang <ji...@163.com>
AuthorDate: Mon Oct 18 17:33:16 2021 +0800
[Fix-6529] Improve the python task to solve the issue about getting the environment variables. (#6531)
* modify config
* improve python task
* improve python task
---
.../plugin/task/python/PythonCommandExecutor.java | 205 ---------------------
.../plugin/task/python/PythonTask.java | 77 ++++++--
2 files changed, 61 insertions(+), 221 deletions(-)
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
deleted file mode 100644
index d40f61b..0000000
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonCommandExecutor.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.task.python;
-
-import java.util.Arrays;
-import org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor;
-import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
-import org.apache.dolphinscheduler.spi.utils.StringUtils;
-
-import org.apache.commons.io.FileUtils;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Consumer;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * python command executor
- */
-public class PythonCommandExecutor extends AbstractCommandExecutor {
-
- /**
- * logger
- */
- private static final Logger logger = LoggerFactory.getLogger(PythonCommandExecutor.class);
-
- /**
- * python
- */
- public static final String PYTHON = "python";
-
- private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$");
-
- /**
- * constructor
- *
- * @param logHandler log handler
- * @param taskRequest TaskRequest
- * @param logger logger
- */
- public PythonCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
- TaskRequest taskRequest,
- Logger logger) {
- super(logHandler, taskRequest, logger);
- }
-
-
- /**
- * build command file path
- *
- * @return command file path
- */
- @Override
- protected String buildCommandFilePath() {
- return String.format("%s/py_%s.command", taskRequest.getExecutePath(), taskRequest.getTaskAppId());
- }
-
- /**
- * create command file if not exists
- *
- * @param execCommand exec command
- * @param commandFile command file
- * @throws IOException io exception
- */
- @Override
- protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
- logger.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(), taskRequest.getExecutePath());
-
- if (!Files.exists(Paths.get(commandFile))) {
- logger.info("generate command file:{}", commandFile);
-
- StringBuilder sb = new StringBuilder();
- sb.append("#-*- encoding=utf8 -*-\n");
-
- sb.append("\n\n");
- sb.append(execCommand);
- logger.info(sb.toString());
-
- // write data to file
- FileUtils.writeStringToFile(new File(commandFile),
- sb.toString(),
- StandardCharsets.UTF_8);
- }
- }
-
- /**
- * get the absolute path of the Python command
- * note :
- * common.properties
- * PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself
- * <p>
- * for example :
- * your PYTHON_HOM is /opt/python3.7/
- * you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties
- * dolphinscheduler.env.path file.
- *
- * @param envPath env path
- * @return python home
- */
- private static String getPythonHome(String envPath) {
- // BufferedReader br = null;
- StringBuilder sb = new StringBuilder();
- try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));) {
- String line;
- while ((line = br.readLine()) != null) {
- if (line.contains(PythonConstants.PYTHON_HOME)) {
- sb.append(line);
- break;
- }
- }
- String result = sb.toString();
- if (StringUtils.isEmpty(result)) {
- return null;
- }
- String[] arrs = result.split(PythonConstants.EQUAL_SIGN);
- if (arrs.length == 2) {
- return arrs[1];
- }
- } catch (IOException e) {
- logger.error("read file failure", e);
- }
- return null;
- }
-
- /**
- * Gets the command path to which Python can execute
- * @return python command path
- */
- @Override
- protected String commandInterpreter() {
- String pythonHome = getPythonHome(taskRequest.getEnvFile());
-
- if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
- pythonHome = getPythonHomeFromEnvironmentConfig(taskRequest.getEnvironmentConfig());
- }
-
- return getPythonCommand(pythonHome);
- }
-
- /**
- * get python command
- *
- * @param pythonHome python home
- * @return python command
- */
- public static String getPythonCommand(String pythonHome) {
- if (StringUtils.isEmpty(pythonHome)) {
- return PYTHON;
- }
- File file = new File(pythonHome);
- if (file.exists() && file.isFile()) {
- return pythonHome;
- }
- if (PYTHON_PATH_PATTERN.matcher(pythonHome).find()) {
- return pythonHome;
- }
- return Paths.get(pythonHome, "/bin/python").toString();
- }
-
- /**
- * get python home from the environment config
- *
- * @param environmentConfig env config
- * @return python home
- */
- public static String getPythonHomeFromEnvironmentConfig(String environmentConfig) {
- String[] lines = environmentConfig.split("\n");
-
- String pythonHomeConfig = Arrays.stream(lines).filter(line -> line.contains(PythonConstants.PYTHON_HOME)).findFirst().get();
-
- if (StringUtils.isEmpty(pythonHomeConfig)) {
- return null;
- }
- String[] arrs = pythonHomeConfig.split(PythonConstants.EQUAL_SIGN);
- if (arrs.length == 2) {
- return arrs[1];
- }
- return null;
- }
-}
diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
index 01457da..382f177 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.python;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
+import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
@@ -29,6 +30,13 @@ import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
+import org.apache.commons.io.FileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
@@ -43,14 +51,9 @@ public class PythonTask extends AbstractTaskExecutor {
private PythonParameters pythonParameters;
/**
- * task dir
- */
- private String taskDir;
-
- /**
- * python command executor
+ * shell command executor
*/
- private PythonCommandExecutor pythonCommandExecutor;
+ private ShellCommandExecutor shellCommandExecutor;
private TaskRequest taskRequest;
@@ -63,7 +66,7 @@ public class PythonTask extends AbstractTaskExecutor {
super(taskRequest);
this.taskRequest = taskRequest;
- this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest,
logger);
}
@@ -93,13 +96,20 @@ public class PythonTask extends AbstractTaskExecutor {
@Override
public void handle() throws Exception {
try {
- // construct process
- String command = buildCommand();
- TaskResponse taskResponse = pythonCommandExecutor.run(command);
+ // generate the content of this python script
+ String pythonScriptContent = buildPythonScriptContent();
+ // generate the file path of this python script
+ String pythonScriptFile = buildPythonCommandFilePath();
+
+ // create this file
+ createPythonCommandFileIfNotExists(pythonScriptContent,pythonScriptFile);
+ String command = "python " + pythonScriptFile;
+
+ TaskResponse taskResponse = shellCommandExecutor.run(command);
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
- setVarPool(pythonCommandExecutor.getVarPool());
+ setVarPool(shellCommandExecutor.getVarPool());
} catch (Exception e) {
logger.error("python task failure", e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
@@ -110,7 +120,7 @@ public class PythonTask extends AbstractTaskExecutor {
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
- pythonCommandExecutor.cancelApplication();
+ shellCommandExecutor.cancelApplication();
}
@Override
@@ -151,12 +161,48 @@ public class PythonTask extends AbstractTaskExecutor {
}
/**
- * build command
+ * create python command file if not exists
+ *
+ * @param pythonScript exec python script
+ * @param pythonScriptFile python script file
+ * @throws IOException io exception
+ */
+ protected void createPythonCommandFileIfNotExists(String pythonScript, String pythonScriptFile) throws IOException {
+ logger.info("tenantCode :{}, task dir:{}", taskRequest.getTenantCode(), taskRequest.getExecutePath());
+
+ if (!Files.exists(Paths.get(pythonScriptFile))) {
+ logger.info("generate python script file:{}", pythonScriptFile);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("#-*- encoding=utf8 -*-\n");
+
+ sb.append("\n\n");
+ sb.append(pythonScript);
+ logger.info(sb.toString());
+
+ // write data to file
+ FileUtils.writeStringToFile(new File(pythonScriptFile),
+ sb.toString(),
+ StandardCharsets.UTF_8);
+ }
+ }
+
+ /**
+ * build python command file path
+ *
+ * @return python command file path
+ */
+ protected String buildPythonCommandFilePath() {
+ return String.format("%s/py_%s.py", taskRequest.getExecutePath(), taskRequest.getTaskAppId());
+ }
+
+ /**
+ * build python script content
*
* @return raw python script
* @throws Exception exception
*/
- private String buildCommand() throws Exception {
+ private String buildPythonScriptContent() throws Exception {
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
// replace placeholder
@@ -170,7 +216,6 @@ public class PythonTask extends AbstractTaskExecutor {
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
logger.info("raw python script : {}", pythonParameters.getRawScript());
- logger.info("task dir : {}", taskDir);
return rawPythonScript;
}