You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/09/14 08:00:25 UTC
[incubator-dolphinscheduler] branch dev updated:
[Improvement][server] WATERDROP task plug-in optimization in switch case
code cleaning. (#3652)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new fad2852 [Improvement][server] WATERDROP task plug-in optimization in switch case code cleaning. (#3652)
fad2852 is described below
commit fad28527685453082cc2950da835d477c9ec5a54
Author: zhuangchong <37...@users.noreply.github.com>
AuthorDate: Mon Sep 14 15:59:56 2020 +0800
[Improvement][server] WATERDROP task plug-in optimization in switch case code cleaning. (#3652)
* WATERDROP switch case code checkstyle.
* add TaskManagerTest.
* TaskManagerTest checkstyle.
* TaskManagerTest checkstyle.
* TaskManagerTest add pom maven-surefire
* TaskManagerTest update.
---
.../common/utils/TaskParametersUtils.java | 3 +-
.../server/worker/task/TaskManager.java | 74 +++++++--------
.../server/worker/task/TaskManagerTest.java | 103 +++++++++++++++++++++
pom.xml | 1 +
4 files changed, 141 insertions(+), 40 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
index 6099a0d..2b40b07 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
@@ -59,9 +59,8 @@ public class TaskParametersUtils {
switch (EnumUtils.getEnum(TaskType.class, taskType)) {
case SUB_PROCESS:
return JSONUtils.parseObject(parameter, SubProcessParameters.class);
- case WATERDROP:
- return JSONUtils.parseObject(parameter, ShellParameters.class);
case SHELL:
+ case WATERDROP:
return JSONUtils.parseObject(parameter, ShellParameters.class);
case PROCEDURE:
return JSONUtils.parseObject(parameter, ProcedureParameters.class);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
index f98d451..34eea9d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.worker.task;
+package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask;
+
import org.slf4j.Logger;
/**
@@ -37,42 +38,39 @@ import org.slf4j.Logger;
*/
public class TaskManager {
- /**
- * create new task
- * @param taskExecutionContext taskExecutionContext
- * @param logger logger
- * @return AbstractTask
- * @throws IllegalArgumentException illegal argument exception
- */
- public static AbstractTask newTask(TaskExecutionContext taskExecutionContext,
- Logger logger)
- throws IllegalArgumentException {
- switch (EnumUtils.getEnum(TaskType.class,taskExecutionContext.getTaskType())) {
- case SHELL:
- return new ShellTask(taskExecutionContext, logger);
- case WATERDROP:
- return new ShellTask(taskExecutionContext, logger);
- case PROCEDURE:
- return new ProcedureTask(taskExecutionContext, logger);
- case SQL:
- return new SqlTask(taskExecutionContext, logger);
- case MR:
- return new MapReduceTask(taskExecutionContext, logger);
- case SPARK:
- return new SparkTask(taskExecutionContext, logger);
- case FLINK:
- return new FlinkTask(taskExecutionContext, logger);
- case PYTHON:
- return new PythonTask(taskExecutionContext, logger);
- case HTTP:
- return new HttpTask(taskExecutionContext, logger);
- case DATAX:
- return new DataxTask(taskExecutionContext, logger);
- case SQOOP:
- return new SqoopTask(taskExecutionContext, logger);
- default:
- logger.error("unsupport task type: {}", taskExecutionContext.getTaskType());
- throw new IllegalArgumentException("not support task type");
+ /**
+ * create new task
+ * @param taskExecutionContext taskExecutionContext
+ * @param logger logger
+ * @return AbstractTask
+ * @throws IllegalArgumentException illegal argument exception
+ */
+ public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger) throws IllegalArgumentException {
+ switch (EnumUtils.getEnum(TaskType.class,taskExecutionContext.getTaskType())) {
+ case SHELL:
+ case WATERDROP:
+ return new ShellTask(taskExecutionContext, logger);
+ case PROCEDURE:
+ return new ProcedureTask(taskExecutionContext, logger);
+ case SQL:
+ return new SqlTask(taskExecutionContext, logger);
+ case MR:
+ return new MapReduceTask(taskExecutionContext, logger);
+ case SPARK:
+ return new SparkTask(taskExecutionContext, logger);
+ case FLINK:
+ return new FlinkTask(taskExecutionContext, logger);
+ case PYTHON:
+ return new PythonTask(taskExecutionContext, logger);
+ case HTTP:
+ return new HttpTask(taskExecutionContext, logger);
+ case DATAX:
+ return new DataxTask(taskExecutionContext, logger);
+ case SQOOP:
+ return new SqoopTask(taskExecutionContext, logger);
+ default:
+ logger.error("unsupport task type: {}", taskExecutionContext.getTaskType());
+ throw new IllegalArgumentException("not support task type");
+ }
}
- }
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
new file mode 100644
index 0000000..058270e
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task;
+
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.Date;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class})
+public class TaskManagerTest {
+
+ private TaskExecutionContext taskExecutionContext;
+
+ private Logger taskLogger;
+
+ private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
+
+ @Before
+ public void before() {
+ // init task execution context, logger
+ taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setProcessId(12345);
+ taskExecutionContext.setProcessDefineId(1);
+ taskExecutionContext.setProcessInstanceId(1);
+ taskExecutionContext.setTaskInstanceId(1);
+ taskExecutionContext.setTaskType("");
+ taskExecutionContext.setFirstSubmitTime(new Date());
+ taskExecutionContext.setDelayTime(0);
+ taskExecutionContext.setLogPath("/tmp/test.log");
+ taskExecutionContext.setHost("localhost");
+ taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
+
+ taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
+ LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+ taskExecutionContext.getProcessDefineId(),
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId()
+ ));
+
+ taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
+ taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+
+ PowerMockito.mockStatic(SpringApplicationContext.class);
+ PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
+ .thenReturn(taskExecutionContextCacheManager);
+ }
+
+ @Test
+ public void testNewTask() {
+
+ taskExecutionContext.setTaskType("SHELL");
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ taskExecutionContext.setTaskType("WATERDROP");
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ taskExecutionContext.setTaskType("HTTP");
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ taskExecutionContext.setTaskType("MR");
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ taskExecutionContext.setTaskType("SPARK");
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ taskExecutionContext.setTaskType("FLINK");
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ taskExecutionContext.setTaskType("PYTHON");
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ taskExecutionContext.setTaskType("DATAX");
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ taskExecutionContext.setTaskType("SQOOP");
+ Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ //taskExecutionContext.setTaskType(null);
+ //Assert.assertNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ //taskExecutionContext.setTaskType("XXX");
+ //Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
+ }
+}
diff --git a/pom.xml b/pom.xml
index 65cbe62..207518c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -847,6 +847,7 @@
<!--<include>**/server/worker/task/datax/DataxTaskTest.java</include>-->
<!--<include>**/server/worker/task/http/HttpTaskTest.java</include>-->
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
+ <include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include>