You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2020/12/20 13:17:49 UTC
[incubator-dolphinscheduler] branch dev updated:
[Improvement-4069][server] When the tenant does not exist,
the task execution should throw an exception (#4108)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 3bcd3fb [Improvement-4069][server] When the tenant does not exist, the task execution should throw an exception (#4108)
3bcd3fb is described below
commit 3bcd3fbcd344983ed35f997bed1fea2c0cf6055d
Author: zhuangchong <37...@users.noreply.github.com>
AuthorDate: Sun Dec 20 21:17:39 2020 +0800
[Improvement-4069][server] When the tenant does not exist, the task execution should throw an exception (#4108)
* when the tenant does not exist, the task execution should throw an exception
* remote method createWorkDirAndUserIfAbsent
* set the task status failed when the tenant code does not exist.
* add taskLog.
* update check os user exists
* update TaskExecuteThreadTest test method.
* solving sonar fail.
---
.../dolphinscheduler/common/utils/FileUtils.java | 26 ++-----------------
.../common/utils/FileUtilsTest.java | 4 +--
.../worker/processor/TaskExecuteProcessor.java | 3 ++-
.../server/worker/runner/TaskExecuteThread.java | 29 ++++++++++++++--------
.../worker/runner/TaskExecuteThreadTest.java | 13 +++++++++-
5 files changed, 36 insertions(+), 39 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index f0e3ec8..0dcfbdd 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -158,13 +158,12 @@ public class FileUtils {
}
/**
- * create directory and user
+ * create directory if absent
*
* @param execLocalPath execute local path
- * @param userName user name
* @throws IOException errors
*/
- public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException {
+ public static void createWorkDirIfAbsent(String execLocalPath) throws IOException {
//if work dir exists, first delete
File execLocalPathFile = new File(execLocalPath);
@@ -177,27 +176,6 @@ public class FileUtils {
String mkdirLog = "create dir success " + execLocalPath;
LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog);
-
- //if not exists this user,then create
- OSUtils.taskLoggerThreadLocal.set(taskLoggerThreadLocal.get());
- try {
- if (!OSUtils.getUserList().contains(userName)) {
- boolean isSuccessCreateUser = OSUtils.createUser(userName);
-
- String infoLog;
- if (isSuccessCreateUser) {
- infoLog = String.format("create user name success %s", userName);
- } else {
- infoLog = String.format("create user name fail %s", userName);
- }
- LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog);
- LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog);
- }
- } catch (Throwable e) {
- LoggerUtils.logError(Optional.ofNullable(logger), e);
- LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
- }
- OSUtils.taskLoggerThreadLocal.remove();
}
/**
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
index f87628c..a4a39ae 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
@@ -67,9 +67,9 @@ public class FileUtilsTest {
}
@Test
- public void testCreateWorkDirAndUserIfAbsent() {
+ public void testCreateWorkDirIfAbsent() {
try {
- FileUtils.createWorkDirAndUserIfAbsent("/tmp/createWorkDirAndUserIfAbsent", "test123");
+ FileUtils.createWorkDirIfAbsent("/tmp/createWorkDirAndUserIfAbsent");
Assert.assertTrue(true);
} catch (Exception e) {
Assert.assertTrue(false);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 3fe3b6d..e43a913 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -116,6 +116,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.error("task execution context is null");
return;
}
+
setTaskCache(taskExecutionContext);
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
@@ -134,7 +135,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
FileUtils.taskLoggerThreadLocal.set(taskLogger);
try {
- FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
+ FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {
String errorLog = String.format("create execLocalPath : %s", execLocalPath);
LoggerUtils.logError(Optional.of(logger), errorLog, ex);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 6baeae9..c1d03e3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
@@ -23,11 +24,11 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -57,7 +58,6 @@ import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException;
-
/**
* task scheduler thread
*/
@@ -113,6 +113,15 @@ public class TaskExecuteThread implements Runnable {
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
try {
logger.info("script path : {}", taskExecutionContext.getExecutePath());
+ // check if the OS user exists
+ if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
+ String errorLog = String.format("tenantCode: %s does not exist", taskExecutionContext.getTenantCode());
+ taskLogger.error(errorLog);
+ responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
+ responseCommand.setEndTime(new Date());
+ return;
+ }
+
// task node
TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
@@ -199,10 +208,10 @@ public class TaskExecuteThread implements Runnable {
// the default timeout is the maximum value of the integer
taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
- if (taskTimeoutParameter.getEnable()){
+ if (taskTimeoutParameter.getEnable()) {
// get timeout strategy
taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
- switch (taskTimeoutParameter.getStrategy()){
+ switch (taskTimeoutParameter.getStrategy()) {
case WARN:
break;
case FAILED:
@@ -223,21 +232,19 @@ public class TaskExecuteThread implements Runnable {
}
}
-
/**
* kill task
*/
- public void kill(){
- if (task != null){
+ public void kill() {
+ if (task != null) {
try {
task.cancelApplication(true);
- }catch (Exception e){
+ } catch (Exception e) {
logger.error(e.getMessage(),e);
}
}
}
-
/**
* download resource file
*
@@ -248,7 +255,7 @@ public class TaskExecuteThread implements Runnable {
private void downloadResource(String execLocalPath,
Map<String,String> projectRes,
Logger logger) throws Exception {
- if (MapUtils.isEmpty(projectRes)){
+ if (MapUtils.isEmpty(projectRes)) {
return;
}
@@ -265,7 +272,7 @@ public class TaskExecuteThread implements Runnable {
logger.info("get resource file from hdfs :{}", resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
- }catch (Exception e){
+ } catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
index f0c68d1..86d3f88 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -33,7 +34,9 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import org.junit.Assert;
import org.junit.Before;
@@ -49,7 +52,7 @@ import org.slf4j.LoggerFactory;
* test task execute thread.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({TaskManager.class, JSONUtils.class, CommonUtils.class, SpringApplicationContext.class})
+@PrepareForTest({TaskManager.class, JSONUtils.class, CommonUtils.class, SpringApplicationContext.class, OSUtils.class})
public class TaskExecuteThreadTest {
private TaskExecutionContext taskExecutionContext;
@@ -110,6 +113,12 @@ public class TaskExecuteThreadTest {
PowerMockito.mockStatic(CommonUtils.class);
PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile");
+
+ List<String> osUserList = new ArrayList<String>() {{
+ add("test");
+ }};
+ PowerMockito.mockStatic(OSUtils.class);
+ PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList);
}
@Test
@@ -117,6 +126,7 @@ public class TaskExecuteThreadTest {
taskExecutionContext.setTaskType("SQL");
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+ taskExecutionContext.setTenantCode("test");
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
taskExecuteThread.run();
taskExecutionContext.getCurrentExecutionStatus();
@@ -132,6 +142,7 @@ public class TaskExecuteThreadTest {
taskExecutionContext.setStartTime(null);
taskExecutionContext.setDelayTime(1);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
+ taskExecutionContext.setTenantCode("test");
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
taskExecuteThread.run();