You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/09/14 08:00:25 UTC

[incubator-dolphinscheduler] branch dev updated: [Improvement][server] WATERDROP task plug-in optimization in switch case code cleaning. (#3652)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new fad2852  [Improvement][server] WATERDROP task plug-in optimization in switch case code cleaning. (#3652)
fad2852 is described below

commit fad28527685453082cc2950da835d477c9ec5a54
Author: zhuangchong <37...@users.noreply.github.com>
AuthorDate: Mon Sep 14 15:59:56 2020 +0800

    [Improvement][server] WATERDROP task plug-in optimization in switch case code cleaning. (#3652)
    
    * WATERDROP switch case code checkstyle.
    
    * add TaskManagerTest.
    
    * TaskManagerTest checkstyle.
    
    * TaskManagerTest checkstyle.
    
    * TaskManagerTest add pom maven-surefire
    
    * TaskManagerTest update.
---
 .../common/utils/TaskParametersUtils.java          |   3 +-
 .../server/worker/task/TaskManager.java            |  74 +++++++--------
 .../server/worker/task/TaskManagerTest.java        | 103 +++++++++++++++++++++
 pom.xml                                            |   1 +
 4 files changed, 141 insertions(+), 40 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
index 6099a0d..2b40b07 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
@@ -59,9 +59,8 @@ public class TaskParametersUtils {
             switch (EnumUtils.getEnum(TaskType.class, taskType)) {
                 case SUB_PROCESS:
                     return JSONUtils.parseObject(parameter, SubProcessParameters.class);
-                case WATERDROP:
-                    return JSONUtils.parseObject(parameter, ShellParameters.class);
                 case SHELL:
+                case WATERDROP:
                     return JSONUtils.parseObject(parameter, ShellParameters.class);
                 case PROCEDURE:
                     return JSONUtils.parseObject(parameter, ProcedureParameters.class);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
index f98d451..34eea9d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
@@ -14,8 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.server.worker.task;
 
+package org.apache.dolphinscheduler.server.worker.task;
 
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.utils.EnumUtils;
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
 import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
 import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask;
+
 import org.slf4j.Logger;
 
 /**
@@ -37,42 +38,39 @@ import org.slf4j.Logger;
  */
 public class TaskManager {
 
-  /**
-   * create new task
-   * @param taskExecutionContext  taskExecutionContext
-   * @param logger    logger
-   * @return AbstractTask
-   * @throws IllegalArgumentException illegal argument exception
-   */
-  public static AbstractTask newTask(TaskExecutionContext taskExecutionContext,
-                                     Logger logger)
-      throws IllegalArgumentException {
-    switch (EnumUtils.getEnum(TaskType.class,taskExecutionContext.getTaskType())) {
-        case SHELL:
-        return new ShellTask(taskExecutionContext, logger);
-      case WATERDROP:
-        return new ShellTask(taskExecutionContext, logger);
-      case PROCEDURE:
-        return new ProcedureTask(taskExecutionContext, logger);
-      case SQL:
-        return new SqlTask(taskExecutionContext, logger);
-      case MR:
-        return new MapReduceTask(taskExecutionContext, logger);
-      case SPARK:
-        return new SparkTask(taskExecutionContext, logger);
-      case FLINK:
-        return new FlinkTask(taskExecutionContext, logger);
-      case PYTHON:
-        return new PythonTask(taskExecutionContext, logger);
-      case HTTP:
-        return new HttpTask(taskExecutionContext, logger);
-      case DATAX:
-        return new DataxTask(taskExecutionContext, logger);
-      case SQOOP:
-        return new SqoopTask(taskExecutionContext, logger);
-      default:
-        logger.error("unsupport task type: {}", taskExecutionContext.getTaskType());
-        throw new IllegalArgumentException("not support task type");
+    /**
+     * create new task
+     * @param taskExecutionContext  taskExecutionContext
+     * @param logger    logger
+     * @return AbstractTask
+     * @throws IllegalArgumentException illegal argument exception
+     */
+    public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger) throws IllegalArgumentException {
+        switch (EnumUtils.getEnum(TaskType.class,taskExecutionContext.getTaskType())) {
+            case SHELL:
+            case WATERDROP:
+                return new ShellTask(taskExecutionContext, logger);
+            case PROCEDURE:
+                return new ProcedureTask(taskExecutionContext, logger);
+            case SQL:
+                return new SqlTask(taskExecutionContext, logger);
+            case MR:
+                return new MapReduceTask(taskExecutionContext, logger);
+            case SPARK:
+                return new SparkTask(taskExecutionContext, logger);
+            case FLINK:
+                return new FlinkTask(taskExecutionContext, logger);
+            case PYTHON:
+                return new PythonTask(taskExecutionContext, logger);
+            case HTTP:
+                return new HttpTask(taskExecutionContext, logger);
+            case DATAX:
+                return new DataxTask(taskExecutionContext, logger);
+            case SQOOP:
+                return new SqoopTask(taskExecutionContext, logger);
+            default:
+                logger.error("unsupport task type: {}", taskExecutionContext.getTaskType());
+                throw new IllegalArgumentException("not support task type");
+        }
     }
-  }
 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
new file mode 100644
index 0000000..058270e
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task;
+
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.Date;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class})
+public class TaskManagerTest {
+
+    private TaskExecutionContext taskExecutionContext;
+
+    private Logger taskLogger;
+
+    private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
+
+    @Before
+    public void before() {
+        // init task execution context, logger
+        taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setProcessId(12345);
+        taskExecutionContext.setProcessDefineId(1);
+        taskExecutionContext.setProcessInstanceId(1);
+        taskExecutionContext.setTaskInstanceId(1);
+        taskExecutionContext.setTaskType("");
+        taskExecutionContext.setFirstSubmitTime(new Date());
+        taskExecutionContext.setDelayTime(0);
+        taskExecutionContext.setLogPath("/tmp/test.log");
+        taskExecutionContext.setHost("localhost");
+        taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
+
+        taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
+                LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+                taskExecutionContext.getProcessDefineId(),
+                taskExecutionContext.getProcessInstanceId(),
+                taskExecutionContext.getTaskInstanceId()
+        ));
+
+        taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
+        taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+
+        PowerMockito.mockStatic(SpringApplicationContext.class);
+        PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
+                .thenReturn(taskExecutionContextCacheManager);
+    }
+
+    @Test
+    public void testNewTask() {
+
+        taskExecutionContext.setTaskType("SHELL");
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        taskExecutionContext.setTaskType("WATERDROP");
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        taskExecutionContext.setTaskType("HTTP");
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        taskExecutionContext.setTaskType("MR");
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        taskExecutionContext.setTaskType("SPARK");
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        taskExecutionContext.setTaskType("FLINK");
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        taskExecutionContext.setTaskType("PYTHON");
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        taskExecutionContext.setTaskType("DATAX");
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        taskExecutionContext.setTaskType("SQOOP");
+        Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        //taskExecutionContext.setTaskType(null);
+        //Assert.assertNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+        //taskExecutionContext.setTaskType("XXX");
+        //Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+    }
+}
diff --git a/pom.xml b/pom.xml
index 65cbe62..207518c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -847,6 +847,7 @@
                         <!--<include>**/server/worker/task/datax/DataxTaskTest.java</include>-->
                         <!--<include>**/server/worker/task/http/HttpTaskTest.java</include>-->
                         <include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
+                        <include>**/server/worker/task/TaskManagerTest.java</include>
                         <include>**/server/worker/EnvFileTest.java</include>
                         <include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
                         <include>**/service/quartz/cron/CronUtilsTest.java</include>