You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2020/12/20 13:17:49 UTC

[incubator-dolphinscheduler] branch dev updated: [Improvement-4069][server] When the tenant does not exist, the task execution should throw an exception (#4108)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3bcd3fb  [Improvement-4069][server] When the tenant does not exist, the task execution should throw an exception (#4108)
3bcd3fb is described below

commit 3bcd3fbcd344983ed35f997bed1fea2c0cf6055d
Author: zhuangchong <37...@users.noreply.github.com>
AuthorDate: Sun Dec 20 21:17:39 2020 +0800

    [Improvement-4069][server] When the tenant does not exist, the task execution should throw an exception (#4108)
    
    * when  the tenant does not exist, the task execution should throw an exception
    
    * remote method createWorkDirAndUserIfAbsent
    
    * set the task status failed when the tenant code does not exist.
    
    * add taskLog.
    
    * update check os user exists
    
    * update TaskExecuteThreadTest test method.
    
    * solving sonar fail.
---
 .../dolphinscheduler/common/utils/FileUtils.java   | 26 ++-----------------
 .../common/utils/FileUtilsTest.java                |  4 +--
 .../worker/processor/TaskExecuteProcessor.java     |  3 ++-
 .../server/worker/runner/TaskExecuteThread.java    | 29 ++++++++++++++--------
 .../worker/runner/TaskExecuteThreadTest.java       | 13 +++++++++-
 5 files changed, 36 insertions(+), 39 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index f0e3ec8..0dcfbdd 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -158,13 +158,12 @@ public class FileUtils {
     }
 
     /**
-     * create directory and user
+     * create directory if absent
      *
      * @param execLocalPath execute local path
-     * @param userName user name
      * @throws IOException errors
      */
-    public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException {
+    public static void createWorkDirIfAbsent(String execLocalPath) throws IOException {
         //if work dir exists, first delete
         File execLocalPathFile = new File(execLocalPath);
 
@@ -177,27 +176,6 @@ public class FileUtils {
         String mkdirLog = "create dir success " + execLocalPath;
         LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog);
         LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog);
-
-        //if not exists this user,then create
-        OSUtils.taskLoggerThreadLocal.set(taskLoggerThreadLocal.get());
-        try {
-            if (!OSUtils.getUserList().contains(userName)) {
-                boolean isSuccessCreateUser = OSUtils.createUser(userName);
-
-                String infoLog;
-                if (isSuccessCreateUser) {
-                    infoLog = String.format("create user name success %s", userName);
-                } else {
-                    infoLog = String.format("create user name fail %s", userName);
-                }
-                LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog);
-                LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog);
-            }
-        } catch (Throwable e) {
-            LoggerUtils.logError(Optional.ofNullable(logger), e);
-            LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
-        }
-        OSUtils.taskLoggerThreadLocal.remove();
     }
 
     /**
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
index f87628c..a4a39ae 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
@@ -67,9 +67,9 @@ public class FileUtilsTest {
     }
 
     @Test
-    public void testCreateWorkDirAndUserIfAbsent() {
+    public void testCreateWorkDirIfAbsent() {
         try {
-            FileUtils.createWorkDirAndUserIfAbsent("/tmp/createWorkDirAndUserIfAbsent", "test123");
+            FileUtils.createWorkDirIfAbsent("/tmp/createWorkDirAndUserIfAbsent");
             Assert.assertTrue(true);
         } catch (Exception e) {
             Assert.assertTrue(false);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 3fe3b6d..e43a913 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -116,6 +116,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
             logger.error("task execution context is null");
             return;
         }
+
         setTaskCache(taskExecutionContext);
         // custom logger
         Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
@@ -134,7 +135,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
 
         FileUtils.taskLoggerThreadLocal.set(taskLogger);
         try {
-            FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, taskExecutionContext.getTenantCode());
+            FileUtils.createWorkDirIfAbsent(execLocalPath);
         } catch (Throwable ex) {
             String errorLog = String.format("create execLocalPath : %s", execLocalPath);
             LoggerUtils.logError(Optional.of(logger), errorLog, ex);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 6baeae9..c1d03e3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.runner;
 
 import org.apache.dolphinscheduler.common.Constants;
@@ -23,11 +24,11 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.RetryerUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@@ -57,7 +58,6 @@ import org.slf4j.LoggerFactory;
 
 import com.github.rholder.retry.RetryException;
 
-
 /**
  *  task scheduler thread
  */
@@ -113,6 +113,15 @@ public class TaskExecuteThread implements Runnable {
         TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
         try {
             logger.info("script path : {}", taskExecutionContext.getExecutePath());
+            // check if the OS user exists
+            if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
+                String errorLog = String.format("tenantCode: %s does not exist", taskExecutionContext.getTenantCode());
+                taskLogger.error(errorLog);
+                responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
+                responseCommand.setEndTime(new Date());
+                return;
+            }
+
             // task node
             TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
 
@@ -199,10 +208,10 @@ public class TaskExecuteThread implements Runnable {
         // the default timeout is the maximum value of the integer
         taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
         TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
-        if (taskTimeoutParameter.getEnable()){
+        if (taskTimeoutParameter.getEnable()) {
             // get timeout strategy
             taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
-            switch (taskTimeoutParameter.getStrategy()){
+            switch (taskTimeoutParameter.getStrategy()) {
                 case WARN:
                     break;
                 case FAILED:
@@ -223,21 +232,19 @@ public class TaskExecuteThread implements Runnable {
         }
     }
 
-
     /**
      *  kill task
      */
-    public void kill(){
-        if (task != null){
+    public void kill() {
+        if (task != null) {
             try {
                 task.cancelApplication(true);
-            }catch (Exception e){
+            } catch (Exception e) {
                 logger.error(e.getMessage(),e);
             }
         }
     }
 
-
     /**
      * download resource file
      *
@@ -248,7 +255,7 @@ public class TaskExecuteThread implements Runnable {
     private void downloadResource(String execLocalPath,
                                   Map<String,String> projectRes,
                                   Logger logger) throws Exception {
-        if (MapUtils.isEmpty(projectRes)){
+        if (MapUtils.isEmpty(projectRes)) {
             return;
         }
 
@@ -265,7 +272,7 @@ public class TaskExecuteThread implements Runnable {
 
                     logger.info("get resource file from hdfs :{}", resHdfsPath);
                     HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
-                }catch (Exception e){
+                } catch (Exception e) {
                     logger.error(e.getMessage(),e);
                     throw new RuntimeException(e.getMessage());
                 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
index f0c68d1..86d3f88 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
@@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -33,7 +34,9 @@ import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.TaskManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -49,7 +52,7 @@ import org.slf4j.LoggerFactory;
  * test task execute thread.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({TaskManager.class, JSONUtils.class, CommonUtils.class, SpringApplicationContext.class})
+@PrepareForTest({TaskManager.class, JSONUtils.class, CommonUtils.class, SpringApplicationContext.class, OSUtils.class})
 public class TaskExecuteThreadTest {
 
     private TaskExecutionContext taskExecutionContext;
@@ -110,6 +113,12 @@ public class TaskExecuteThreadTest {
 
         PowerMockito.mockStatic(CommonUtils.class);
         PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile");
+
+        List<String> osUserList = new ArrayList<String>() {{
+                add("test");
+            }};
+        PowerMockito.mockStatic(OSUtils.class);
+        PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList);
     }
 
     @Test
@@ -117,6 +126,7 @@ public class TaskExecuteThreadTest {
         taskExecutionContext.setTaskType("SQL");
         taskExecutionContext.setStartTime(new Date());
         taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+        taskExecutionContext.setTenantCode("test");
         TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
         taskExecuteThread.run();
         taskExecutionContext.getCurrentExecutionStatus();
@@ -132,6 +142,7 @@ public class TaskExecuteThreadTest {
         taskExecutionContext.setStartTime(null);
         taskExecutionContext.setDelayTime(1);
         taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
+        taskExecutionContext.setTenantCode("test");
         TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
         taskExecuteThread.run();