You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/17 03:52:38 UTC
[incubator-dolphinscheduler] branch refactor-worker updated: 1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify (#2201)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new e31b48f 1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify (#2201)
e31b48f is described below
commit e31b48fdb21d0257440b67a1b8b7f0912281237c
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue Mar 17 11:52:31 2020 +0800
1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify (#2201)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
.../api/service/WorkerGroupService.java | 25 +++++++++++++++-------
.../dao/datasource/SpringConnectionFactory.java | 1 -
.../builder/TaskExecutionContextBuilder.java | 5 +----
3 files changed, 18 insertions(+), 13 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
index 6384e38..8317768 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
@@ -33,10 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
* work group service
@@ -184,10 +181,22 @@ public class WorkerGroupService extends BaseService {
* @return all worker group list
*/
public Map<String,Object> queryAllGroup() {
- Map<String, Object> result = new HashMap<>(5);
- String WORKER_PATH = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
- List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(WORKER_PATH);
- result.put(Constants.DATA_LIST, workerGroupList);
+ Map<String, Object> result = new HashMap<>();
+ String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
+ List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
+
+ // available workerGroup list
+ List<String> availableWorkerGroupList = new ArrayList<>();
+
+ for (String workerGroup : workerGroupList){
+ String workerGroupPath= workerPath + "/" + workerGroup;
+ List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
+ if (CollectionUtils.isNotEmpty(childrenNodes)){
+ availableWorkerGroupList.add(workerGroup);
+ }
+ }
+
+ result.put(Constants.DATA_LIST, availableWorkerGroupList);
putMsg(result, Status.SUCCESS);
return result;
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
index 5788b9d..cb9f22e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
@@ -123,7 +123,6 @@ public class SpringConnectionFactory {
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
MybatisConfiguration configuration = new MybatisConfiguration();
- configuration.addMappers("org.apache.dolphinscheduler.dao.mapper");
configuration.setMapUnderscoreToCamelCase(true);
configuration.setCacheEnabled(false);
configuration.setCallSettersOnNulls(true);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index a5e426e..08c105a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -17,15 +17,12 @@
package org.apache.dolphinscheduler.server.builder;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import java.util.List;
-
/**
* TaskExecutionContext builder
*/
@@ -51,7 +48,7 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
- taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+ taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
return this;
}