You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/17 03:52:38 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: 1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify (#2201)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new e31b48f  1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify (#2201)
e31b48f is described below

commit e31b48fdb21d0257440b67a1b8b7f0912281237c
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue Mar 17 11:52:31 2020 +0800

    1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify (#2201)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * TaskPriority refactor
    
    * remove logs
    
    * WorkerServer refactor
    
    * MasterSchedulerService modify
    
    * WorkerConfig listen port modify
    
    * modify master and worker listen port
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * Encapsulate the parameters required by sqltask
    
    * 1,Encapsulate the parameters required by sqltask
    2,SQLTask optimization
    
    * AbstractTask modify
    
    * ProcedureTask optimization
    
    * MasterSchedulerService modify
    
    * TaskUpdateQueueConsumer modify
    
    * test
    
    * DataxTask process run debug
    
    * DataxTask process run debug
    
    * add protobuf dependency,MR、Spark task etc need this
    
    * TaskUpdateQueueConsumer modify
    
    * TaskExecutionContextBuilder set TaskInstance workgroup
    
    * WorkerGroupService queryAllGroup modify
    query available work group
    
    * 1,get workergroup from zk modify
    2,SpringConnectionFactory repeat load modify
    
    Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
 .../api/service/WorkerGroupService.java            | 25 +++++++++++++++-------
 .../dao/datasource/SpringConnectionFactory.java    |  1 -
 .../builder/TaskExecutionContextBuilder.java       |  5 +----
 3 files changed, 18 insertions(+), 13 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
index 6384e38..8317768 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
@@ -33,10 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * work group service
@@ -184,10 +181,22 @@ public class WorkerGroupService extends BaseService {
      * @return all worker group list
      */
     public Map<String,Object> queryAllGroup() {
-        Map<String, Object> result = new HashMap<>(5);
-        String WORKER_PATH = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
-        List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(WORKER_PATH);
-        result.put(Constants.DATA_LIST, workerGroupList);
+        Map<String, Object> result = new HashMap<>();
+        String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
+        List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
+
+        // available workerGroup list
+        List<String> availableWorkerGroupList = new ArrayList<>();
+
+        for (String workerGroup : workerGroupList){
+            String workerGroupPath= workerPath + "/" + workerGroup;
+            List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
+            if (CollectionUtils.isNotEmpty(childrenNodes)){
+                availableWorkerGroupList.add(workerGroup);
+            }
+        }
+
+        result.put(Constants.DATA_LIST, availableWorkerGroupList);
         putMsg(result, Status.SUCCESS);
         return result;
     }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
index 5788b9d..cb9f22e 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
@@ -123,7 +123,6 @@ public class SpringConnectionFactory {
     @Bean
     public SqlSessionFactory sqlSessionFactory() throws Exception {
         MybatisConfiguration configuration = new MybatisConfiguration();
-        configuration.addMappers("org.apache.dolphinscheduler.dao.mapper");
         configuration.setMapUnderscoreToCamelCase(true);
         configuration.setCacheEnabled(false);
         configuration.setCallSettersOnNulls(true);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
index a5e426e..08c105a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
@@ -17,15 +17,12 @@
 
 package org.apache.dolphinscheduler.server.builder;
 
-import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 
-import java.util.List;
-
 /**
  *  TaskExecutionContext builder
  */
@@ -51,7 +48,7 @@ public class TaskExecutionContextBuilder {
         taskExecutionContext.setLogPath(taskInstance.getLogPath());
         taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
         taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
-        taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
         return this;
     }