You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/24 07:12:54 UTC

[incubator-dolphinscheduler] branch refactor-worker updated: add async queue and new a thread reslove taskResponse is faster than taskAck to db (#2297)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/refactor-worker by this push:
     new 62f7d21  add async queue and new a thread reslove taskResponse is faster than taskAck to db (#2297)
62f7d21 is described below

commit 62f7d21bdaa003eb2a8368db7a86c557b95f452d
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue Mar 24 15:12:48 2020 +0800

    add async queue and new a thread reslove taskResponse is faster than taskAck to db (#2297)
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * 1, master persistent task
    2. extract  master and worker communication model
    
    * add license
    
    * modify javadoc error
    
    * TaskExecutionContext create modify
    
    * buildAckCommand taskInstanceId not set modify
    
    * java doc error modify
    
    * add comment
    
    * ExecutorManager interface add generic type
    
    * add TaskInstanceCacheManager receive Worker report result
    
    * TaskInstance setExecutePath
    
    * add TaskInstanceCacheManager to receive Worker Task result report
    
    * TaskInstanceCacheManager add remove method
    
    * add license
    
    * add dispatcht task method
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * AbstractCommandExecutor remove db access
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * taskInstanceCache is null ,need load from db
    
    * 1,worker TaskPros use TaskExecutionContext replase
    2,Master kill Task , KillTaskProcessor modify
    
    * worker remove db
    
    * ShellTask modify
    
    * master persistence processId and appIds
    
    * master persistence processId and appIds
    
    * master add kill task logic
    
    * master add kill task logic
    
    * master add kill task logic
    
    * javadoc error modify
    
    * remove chinese log
    
    * executeDirectly method add Override
    
    * remote module modify
    
    * TaskKillResponseProcessor command type modify
    
    * create buildKillCommand
    
    * host add host:port format
    
    * host add host:port format
    
    * TaskAckProcessor modify
    
    * TaskAckProcessor modify
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * task prioriry refator
    
    * remove ITaskQueue
    
    * TaskPriority refactor
    
    * remove logs
    
    * WorkerServer refactor
    
    * MasterSchedulerService modify
    
    * WorkerConfig listen port modify
    
    * modify master and worker listen port
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * cancelTaskInstance set TaskExecutionContext host,logPath,executePath
    
    * Encapsulate the parameters required by sqltask
    
    * 1,Encapsulate the parameters required by sqltask
    2,SQLTask optimization
    
    * AbstractTask modify
    
    * ProcedureTask optimization
    
    * MasterSchedulerService modify
    
    * TaskUpdateQueueConsumer modify
    
    * test
    
    * DataxTask process run debug
    
    * DataxTask process run debug
    
    * add protobuf dependency,MR、Spark task etc need this
    
    * TaskUpdateQueueConsumer modify
    
    * TaskExecutionContextBuilder set TaskInstance workgroup
    
    * WorkerGroupService queryAllGroup modify
    query available work group
    
    * 1,get workergroup from zk modify
    2,SpringConnectionFactory repeat load modify
    
    * master and worker register ip  use OSUtils.getHost()
    
    * ProcessInstance host set ip:port format
    
    * worker fault tolerance modify
    
    * Constants and .env modify
    
    * master fault tolerant bug modify
    
    * UT add pom.xml
    
    * timing online  modify
    
    * when taskResponse is faster than taskAck to db,task state will error
    add async queue and new a thread reslove this problem
    
    * TaskExecutionContext set host
    
    Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
 .../server/master/manager/TaskEvent.java           | 227 +++++++++++++++++++++
 .../server/master/manager/TaskManager.java         | 121 +++++++++++
 .../server/master/processor/TaskAckProcessor.java  |  19 +-
 .../master/processor/TaskResponseProcessor.java    |  15 +-
 .../worker/processor/TaskExecuteProcessor.java     |   2 +
 5 files changed, 373 insertions(+), 11 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
new file mode 100644
index 0000000..5c6740f
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.manager;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+
+import java.util.Date;
+
+/**
+ * task event
+ */
+public class TaskEvent {
+
+    public static final String ACK = "ack";
+    public static final String RESPONSE = "response";
+
+    /**
+     * taskInstanceId
+     */
+    private int taskInstanceId;
+
+    /**
+     * worker address
+     */
+    private String workerAddress;
+
+    /**
+     * state
+     */
+    private ExecutionStatus state;
+
+    /**
+     * start time
+     */
+    private Date startTime;
+
+    /**
+     * end time
+     */
+    private Date endTime;
+
+    /**
+     * execute path
+     */
+    private String executePath;
+
+    /**
+     * log path
+     */
+    private String logPath;
+
+    /**
+     * processId
+     */
+    private int processId;
+
+    /**
+     * appIds
+     */
+    private String appIds;
+
+    /**
+     * ack / response
+     */
+    private String type;
+
+
+    /**
+     * receive ack info
+     * @param state state
+     * @param startTime startTime
+     * @param workerAddress workerAddress
+     * @param executePath executePath
+     * @param logPath logPath
+     * @param taskInstanceId taskInstanceId
+     * @param type type
+     */
+    public void receiveAck(ExecutionStatus state,
+                           Date startTime,
+                           String workerAddress,
+                           String executePath,
+                           String logPath,
+                           int taskInstanceId,
+                           String type){
+        this.state = state;
+        this.startTime = startTime;
+        this.workerAddress = workerAddress;
+        this.executePath = executePath;
+        this.logPath = logPath;
+        this.taskInstanceId = taskInstanceId;
+        this.type = type;
+    }
+
+    /**
+     * receive response info
+     * @param state state
+     * @param endTime endTime
+     * @param processId processId
+     * @param appIds appIds
+     * @param taskInstanceId taskInstanceId
+     * @param type type
+     */
+    public void receiveResponse(ExecutionStatus state,
+                                Date endTime,
+                                int processId,
+                                String appIds,
+                                int taskInstanceId,
+                                String type){
+        this.state = state;
+        this.endTime = endTime;
+        this.processId = processId;
+        this.appIds = appIds;
+        this.taskInstanceId = taskInstanceId;
+        this.type = type;
+    }
+
+    public int getTaskInstanceId() {
+        return taskInstanceId;
+    }
+
+    public void setTaskInstanceId(int taskInstanceId) {
+        this.taskInstanceId = taskInstanceId;
+    }
+
+    public String getWorkerAddress() {
+        return workerAddress;
+    }
+
+    public void setWorkerAddress(String workerAddress) {
+        this.workerAddress = workerAddress;
+    }
+
+    public ExecutionStatus getState() {
+        return state;
+    }
+
+    public void setState(ExecutionStatus state) {
+        this.state = state;
+    }
+
+    public Date getStartTime() {
+        return startTime;
+    }
+
+    public void setStartTime(Date startTime) {
+        this.startTime = startTime;
+    }
+
+    public Date getEndTime() {
+        return endTime;
+    }
+
+    public void setEndTime(Date endTime) {
+        this.endTime = endTime;
+    }
+
+    public String getExecutePath() {
+        return executePath;
+    }
+
+    public void setExecutePath(String executePath) {
+        this.executePath = executePath;
+    }
+
+    public String getLogPath() {
+        return logPath;
+    }
+
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+    }
+
+    public int getProcessId() {
+        return processId;
+    }
+
+    public void setProcessId(int processId) {
+        this.processId = processId;
+    }
+
+    public String getAppIds() {
+        return appIds;
+    }
+
+    public void setAppIds(String appIds) {
+        this.appIds = appIds;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    @Override
+    public String toString() {
+        return "TaskEvent{" +
+                "taskInstanceId=" + taskInstanceId +
+                ", workerAddress='" + workerAddress + '\'' +
+                ", state=" + state +
+                ", startTime=" + startTime +
+                ", endTime=" + endTime +
+                ", executePath='" + executePath + '\'' +
+                ", logPath='" + logPath + '\'' +
+                ", processId=" + processId +
+                ", appIds='" + appIds + '\'' +
+                ", type='" + type + '\'' +
+                '}';
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
new file mode 100644
index 0000000..a2710ee
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.manager;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * task manager
+ */
+@Component
+public class TaskManager {
+
+    /**
+     * logger
+     */
+    private static final Logger logger = LoggerFactory.getLogger(TaskManager.class);
+
+    /**
+     * attemptQueue
+     */
+    private final BlockingQueue<TaskEvent> attemptQueue = new LinkedBlockingQueue<>(5000);
+
+
+    /**
+     * process service
+     */
+    @Autowired
+    private ProcessService processService;
+
+
+    @PostConstruct
+    public void init(){
+        TaskWorker taskWorker = new TaskWorker();
+        taskWorker.start();
+    }
+
+    /**
+     * put task to attemptQueue
+     *
+     * @param taskEvent taskEvent
+     */
+    public void putTask(TaskEvent taskEvent){
+        try {
+            attemptQueue.put(taskEvent);
+        } catch (InterruptedException e) {
+            logger.error("put task : {} error :{}",taskEvent,e);
+        }
+    }
+
+
+    /**
+     * task worker thread
+     */
+    class TaskWorker extends Thread {
+
+        @Override
+        public void run() {
+
+            while (Stopper.isRunning()){
+                try {
+                    if (attemptQueue.size() == 0){
+                        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+                        continue;
+                    }
+                    TaskEvent taskEvent = attemptQueue.take();
+
+                    persist(taskEvent);
+
+                }catch (Exception e){
+                    logger.error("persist task error",e);
+                }
+            }
+        }
+
+        /**
+         * persist  taskEvent
+         * @param taskEvent taskEvent
+         */
+        private void persist(TaskEvent taskEvent){
+            if (TaskEvent.ACK.equals(taskEvent.getType())){
+                processService.changeTaskState(taskEvent.getState(),
+                        taskEvent.getStartTime(),
+                        taskEvent.getWorkerAddress(),
+                        taskEvent.getExecutePath(),
+                        taskEvent.getLogPath(),
+                        taskEvent.getTaskInstanceId());
+            }else if (TaskEvent.RESPONSE.equals(taskEvent.getType())){
+                processService.changeTaskState(taskEvent.getState(),
+                        taskEvent.getEndTime(),
+                        taskEvent.getProcessId(),
+                        taskEvent.getAppIds(),
+                        taskEvent.getTaskInstanceId());
+            }
+        }
+    }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 8f0b731..a678cad 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.manager.TaskEvent;
+import org.apache.dolphinscheduler.server.master.manager.TaskManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
@@ -43,7 +45,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
     /**
      * process service
      */
-    private final ProcessService processService;
+    private final TaskManager taskManager;
 
     /**
      * taskInstance cache manager
@@ -51,7 +53,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
     private final TaskInstanceCacheManager taskInstanceCacheManager;
 
     public TaskAckProcessor(){
-        this.processService = SpringApplicationContext.getBean(ProcessService.class);
+        this.taskManager = SpringApplicationContext.getBean(TaskManager.class);
         this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
     }
 
@@ -69,15 +71,18 @@ public class TaskAckProcessor implements NettyRequestProcessor {
         taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
 
         String workerAddress = ChannelUtils.toAddress(channel).getAddress();
-        /**
-         * change Task state
-         */
-        processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
+
+        // TaskEvent
+        TaskEvent taskEvent = new TaskEvent();
+        taskEvent.receiveAck(ExecutionStatus.of(taskAckCommand.getStatus()),
                 taskAckCommand.getStartTime(),
                 workerAddress,
                 taskAckCommand.getExecutePath(),
                 taskAckCommand.getLogPath(),
-                taskAckCommand.getTaskInstanceId());
+                taskAckCommand.getTaskInstanceId(),
+                TaskEvent.ACK);
+
+        taskManager.putTask(taskEvent);
 
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 93ca4ab..ffc5d72 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
 import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.manager.TaskEvent;
+import org.apache.dolphinscheduler.server.master.manager.TaskManager;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
@@ -42,7 +44,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
     /**
      * process service
      */
-    private final ProcessService processService;
+    private final TaskManager taskManager;
 
     /**
      * taskInstance cache manager
@@ -50,7 +52,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
     private final TaskInstanceCacheManager taskInstanceCacheManager;
 
     public TaskResponseProcessor(){
-        this.processService = SpringApplicationContext.getBean(ProcessService.class);
+        this.taskManager = SpringApplicationContext.getBean(TaskManager.class);
         this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
     }
 
@@ -70,11 +72,16 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
 
         taskInstanceCacheManager.cacheTaskInstance(responseCommand);
 
-        processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
+        // TaskEvent
+        TaskEvent taskEvent = new TaskEvent();
+        taskEvent.receiveResponse(ExecutionStatus.of(responseCommand.getStatus()),
                 responseCommand.getEndTime(),
                 responseCommand.getProcessId(),
                 responseCommand.getAppIds(),
-                responseCommand.getTaskInstanceId());
+                responseCommand.getTaskInstanceId(),
+                TaskEvent.RESPONSE);
+
+        taskManager.putTask(taskEvent);
     }
 
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 80ba649..ed47613 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -24,6 +24,7 @@ import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.FileUtils;
@@ -86,6 +87,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
         String contextJson = taskRequestCommand.getTaskExecutionContext();
 
         TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
+        taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
 
         // local execute path
         String execLocalPath = getExecLocalPath(taskExecutionContext);