You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/28 14:34:53 UTC
[incubator-dolphinscheduler] branch refactor-worker updated: 1,add UT in pom 2,refactor TaskUpdateQueue (#2326)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 68cb81f 1,add UT in pom 2,refactor TaskUpdateQueue (#2326)
68cb81f is described below
commit 68cb81fdf5e62c95444f22837545c3c3b8604ba3
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Sat Mar 28 22:34:47 2020 +0800
1,add UT in pom 2,refactor TaskUpdateQueue (#2326)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
* master fault tolerant bug modify
* UT add pom.xml
* timing online modify
* when taskResponse is faster than taskAck to db,task state will error
add async queue and new a thread reslove this problem
* TaskExecutionContext set host
* 1,TaskManager refactor
2, api start load server dolphinschedule-daemon.sh modify
* 1,TaskManager refactor
2, api start load server dolphinschedule-daemon.sh modify
* add UT in pom.xml
* revert dolphinscheduler-daemon.sh
* ZookeeperRegister use common.properties zookeeperRoot path
* api start exclude org.apache.dolphinscheduler.server.*
* ZookeeperRegister use common.properties zookeeperRoot path
* 1,api start load server filter
2,SHELL task exitStatusCode modify
* java doc error modify
* java doc error modify
* remove todo
* add UT in pom
* 1,add UT in pom
2,refactor TaskUpdateQueue
Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
.../api/controller/ProcessInstanceController.java | 1 -
.../api/service/DataAnalysisServiceTest.java | 2 -
...onsumer.java => TaskPriorityQueueConsumer.java} | 107 +++++++++++++--------
.../master/runner/MasterBaseTaskExecThread.java | 8 +-
...TaskUpdateQueue.java => TaskPriorityQueue.java} | 2 +-
...teQueueImpl.java => TaskPriorityQueueImpl.java} | 4 +-
.../service/queue/TaskQueueFactory.java | 55 -----------
.../src/test/java/queue/TaskUpdateQueueTest.java | 6 +-
pom.xml | 23 ++---
9 files changed, 85 insertions(+), 123 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index b6533ad..2fd332f 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import io.swagger.annotations.*;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
index 10220e2..35cc6ae 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataAnalysisServiceTest.java
@@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -46,7 +45,6 @@ import java.util.List;
import java.util.Map;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({TaskQueueFactory.class})
public class DataAnalysisServiceTest {
@InjectMocks
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
similarity index 69%
rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 077b1ef..4aaf901 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -40,7 +40,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -53,18 +53,18 @@ import java.util.List;
* TaskUpdateQueue consumer
*/
@Component
-public class TaskUpdateQueueConsumer extends Thread{
+public class TaskPriorityQueueConsumer extends Thread{
/**
* logger of TaskUpdateQueueConsumer
*/
- private static final Logger logger = LoggerFactory.getLogger(TaskUpdateQueueConsumer.class);
+ private static final Logger logger = LoggerFactory.getLogger(TaskPriorityQueueConsumer.class);
/**
* taskUpdateQueue
*/
@Autowired
- private TaskUpdateQueue taskUpdateQueue;
+ private TaskPriorityQueue taskUpdateQueue;
/**
* processService
@@ -155,52 +155,19 @@ public class TaskUpdateQueueConsumer extends Thread{
TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
// SQL task
if (taskType == TaskType.SQL){
- SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class);
- int datasourceId = sqlParameters.getDatasource();
- DataSource datasource = processService.findDataSourceById(datasourceId);
- sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
-
- // whether udf type
- boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
- && StringUtils.isNotEmpty(sqlParameters.getUdfs());
-
- if (udfTypeFlag){
- String[] udfFunIds = sqlParameters.getUdfs().split(",");
- int[] udfFunIdsArray = new int[udfFunIds.length];
- for(int i = 0 ; i < udfFunIds.length;i++){
- udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]);
- }
-
- List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray);
- sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
- }
+ setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
}
// DATAX task
if (taskType == TaskType.DATAX){
- DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
-
- DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
- DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
-
-
- dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
- dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
- dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
-
- dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
- dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
- dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+ setDataxTaskRelation(dataxTaskExecutionContext, taskNode);
}
// procedure task
if (taskType == TaskType.PROCEDURE){
- ProcedureParameters procedureParameters = JSONObject.parseObject(taskNode.getParams(), ProcedureParameters.class);
- int datasourceId = procedureParameters.getDatasource();
- DataSource datasource = processService.findDataSourceById(datasourceId);
- procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+ setProcedureTaskRelation(procedureTaskExecutionContext, taskNode);
}
@@ -216,6 +183,66 @@ public class TaskUpdateQueueConsumer extends Thread{
}
/**
+ * set procedure task relation
+ * @param procedureTaskExecutionContext procedureTaskExecutionContext
+ * @param taskNode taskNode
+ */
+ private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskNode taskNode) {
+ ProcedureParameters procedureParameters = JSONObject.parseObject(taskNode.getParams(), ProcedureParameters.class);
+ int datasourceId = procedureParameters.getDatasource();
+ DataSource datasource = processService.findDataSourceById(datasourceId);
+ procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+ }
+
+ /**
+ * set datax task relation
+ * @param dataxTaskExecutionContext dataxTaskExecutionContext
+ * @param taskNode taskNode
+ */
+ private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
+ DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class);
+
+ DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource());
+ DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
+
+
+ dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
+ dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
+ dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+
+ dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
+ dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
+ dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+ }
+
+ /**
+ * set SQL task relation
+ * @param sqlTaskExecutionContext sqlTaskExecutionContext
+ * @param taskNode taskNode
+ */
+ private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskNode taskNode) {
+ SqlParameters sqlParameters = JSONObject.parseObject(taskNode.getParams(), SqlParameters.class);
+ int datasourceId = sqlParameters.getDatasource();
+ DataSource datasource = processService.findDataSourceById(datasourceId);
+ sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
+
+ // whether udf type
+ boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
+ && StringUtils.isNotEmpty(sqlParameters.getUdfs());
+
+ if (udfTypeFlag){
+ String[] udfFunIds = sqlParameters.getUdfs().split(",");
+ int[] udfFunIdsArray = new int[udfFunIds.length];
+ for(int i = 0 ; i < udfFunIds.length;i++){
+ udfFunIdsArray[i]=Integer.parseInt(udfFunIds[i]);
+ }
+
+ List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(udfFunIdsArray);
+ sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
+ }
+ }
+
+ /**
* get execute local path
*
* @return execute local path
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
index b0fd632..dd7c564 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*;
@@ -76,7 +76,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* taskUpdateQueue
*/
- private TaskUpdateQueue taskUpdateQueue;
+ private TaskPriorityQueue taskUpdateQueue;
/**
* constructor of MasterBaseTaskExecThread
* @param taskInstance task instance
@@ -89,7 +89,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
this.cancel = false;
this.taskInstance = taskInstance;
this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
- this.taskUpdateQueue = SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
+ this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
}
/**
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
similarity index 97%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
index 48f510e..3ad9aef 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueue.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.service.queue;
-public interface TaskUpdateQueue {
+public interface TaskPriorityQueue {
/**
* put task info
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
similarity index 96%
rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index 1b3bec7..0a0fb1b 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskUpdateQueueImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.service.queue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.*;
@@ -31,7 +29,7 @@ import static org.apache.dolphinscheduler.common.Constants.*;
* tasks queue implementation
*/
@Service
-public class TaskUpdateQueueImpl implements TaskUpdateQueue {
+public class TaskPriorityQueueImpl implements TaskPriorityQueue {
/**
* queue size
*/
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
deleted file mode 100644
index 3ea3195..0000000
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskQueueFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.service.queue;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * task queue factory
- */
-public class TaskQueueFactory {
-
- private static final Logger logger = LoggerFactory.getLogger(TaskQueueFactory.class);
-
-
- private TaskQueueFactory(){
-
- }
-
-
- /**
- * get instance (singleton)
- *
- * @return instance
- */
- public static TaskUpdateQueue getTaskQueueInstance() {
- String queueImplValue = CommonUtils.getQueueImplValue();
- if (StringUtils.isNotBlank(queueImplValue)) {
- logger.info("task queue impl use zookeeper ");
- return SpringApplicationContext.getBean(TaskUpdateQueueImpl.class);
- }else{
- logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
- System.exit(-1);
- }
-
- return null;
- }
-}
diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
index a0e4fad..ca6c083 100644
--- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
+++ b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java
@@ -17,8 +17,8 @@
package queue;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueue;
-import org.apache.dolphinscheduler.service.queue.TaskUpdateQueueImpl;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
+import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -45,7 +45,7 @@ public class TaskUpdateQueueTest {
String taskInfo3 = "1_1_0_3_default";
String taskInfo4 = "1_1_0_4_default";
- TaskUpdateQueue queue = new TaskUpdateQueueImpl();
+ TaskPriorityQueue queue = new TaskPriorityQueueImpl();
queue.put(taskInfo1);
queue.put(taskInfo2);
queue.put(taskInfo3);
diff --git a/pom.xml b/pom.xml
index e7b95d9..1ba35b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -717,23 +717,18 @@
<include>**/dao/mapper/AccessTokenMapperTest.java</include>
<include>**/dao/mapper/AlertGroupMapperTest.java</include>
<include>**/dao/mapper/AlertMapperTest.java</include>
- <include>**/dao/mapper/CommandMapperTest.java</include>
- <include>**/dao/cron/CronUtilsTest.java</include>
- <include>**/dao/utils/DagHelperTest.java</include>
- <include>**/server/worker/task/datax/DataxTaskTest.java</include>
- <include>**/server/utils/DataxUtilsTest.java</include>
- <include>**/server/utils/SparkArgsUtilsTest.java</include>
- <include>**/server/utils/FlinkArgsUtilsTest.java</include>
- <include>**/server/utils/ParamUtilsTest.java</include>
+ <include>**/dao/mapper/DataSourceMapperTest.java</include>
<include>**/server/log/MasterLogFilterTest.java</include>
<include>**/server/log/SensitiveDataConverterTest.java</include>
<include>**/server/log/TaskLogDiscriminatorTest.java</include>
<include>**/server/log/TaskLogFilterTest.java</include>
<include>**/server/log/WorkerLogFilterTest.java</include>
- <include>**/server/master/executor/NettyExecutorManagerTest.java</include>
- <include>**/server/master/host/LowerWeightRoundRobinTest.java</include>
- <include>**/server/master/host/RandomSelectorTest.java</include>
- <include>**/server/master/host/RoundRobinSelectorTest.java</include>
+ <include>**/server/master/dispatch/executor/NettyExecutorManagerTest.java</include>
+ <include>**/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java</include>
+ <include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>
+ <include>**/server/master/dispatch/host/assign/RoundRobinSelectorTest.java</include>
+ <include>**/server/master/dispatch/host/RoundRobinHostManagerTest.java</include>
+ <include>**/server/master/dispatch/ExecutorDispatcherTest.java</include>
<include>**/server/master/register/MasterRegistryTest.java</include>
<include>**/server/master/AlertManagerTest.java</include>
<include>**/server/master/MasterCommandTest.java</include>
@@ -741,19 +736,19 @@
<include>**/server/master/ParamsTest.java</include>
<include>**/server/register/ZookeeperNodeManagerTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
+ <include>**/server/utils/ExecutionContextTestUtils.java</include>
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>
<include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
- <include>**/server/worker/register/WorkerRegistryTest.java</include>
+ <include>**/server/worker/registry/WorkerRegistryTest.java</include>
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include>
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
<include>**/server/worker/task/dependent/DependentTaskTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/EnvFileTest.java</include>
-
</includes>
<!-- <skip>true</skip> -->
<argLine>-Xmx2048m</argLine>