You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by te...@apache.org on 2020/03/24 07:12:54 UTC
[incubator-dolphinscheduler] branch refactor-worker updated: add
async queue and new a thread reslove taskResponse is faster than taskAck to
db (#2297)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new 62f7d21 add async queue and new a thread reslove taskResponse is faster than taskAck to db (#2297)
62f7d21 is described below
commit 62f7d21bdaa003eb2a8368db7a86c557b95f452d
Author: qiaozhanwei <qi...@outlook.com>
AuthorDate: Tue Mar 24 15:12:48 2020 +0800
add async queue and new a thread reslove taskResponse is faster than taskAck to db (#2297)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* Encapsulate the parameters required by sqltask
* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization
* AbstractTask modify
* ProcedureTask optimization
* MasterSchedulerService modify
* TaskUpdateQueueConsumer modify
* test
* DataxTask process run debug
* DataxTask process run debug
* add protobuf dependency,MR、Spark task etc need this
* TaskUpdateQueueConsumer modify
* TaskExecutionContextBuilder set TaskInstance workgroup
* WorkerGroupService queryAllGroup modify
query available work group
* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
* master and worker register ip use OSUtils.getHost()
* ProcessInstance host set ip:port format
* worker fault tolerance modify
* Constants and .env modify
* master fault tolerant bug modify
* UT add pom.xml
* timing online modify
* when taskResponse is faster than taskAck to db,task state will error
add async queue and new a thread reslove this problem
* TaskExecutionContext set host
Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
.../server/master/manager/TaskEvent.java | 227 +++++++++++++++++++++
.../server/master/manager/TaskManager.java | 121 +++++++++++
.../server/master/processor/TaskAckProcessor.java | 19 +-
.../master/processor/TaskResponseProcessor.java | 15 +-
.../worker/processor/TaskExecuteProcessor.java | 2 +
5 files changed, 373 insertions(+), 11 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
new file mode 100644
index 0000000..5c6740f
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskEvent.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.manager;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+
+import java.util.Date;
+
+/**
+ * task event
+ */
+public class TaskEvent {
+
+ public static final String ACK = "ack";
+ public static final String RESPONSE = "response";
+
+ /**
+ * taskInstanceId
+ */
+ private int taskInstanceId;
+
+ /**
+ * worker address
+ */
+ private String workerAddress;
+
+ /**
+ * state
+ */
+ private ExecutionStatus state;
+
+ /**
+ * start time
+ */
+ private Date startTime;
+
+ /**
+ * end time
+ */
+ private Date endTime;
+
+ /**
+ * execute path
+ */
+ private String executePath;
+
+ /**
+ * log path
+ */
+ private String logPath;
+
+ /**
+ * processId
+ */
+ private int processId;
+
+ /**
+ * appIds
+ */
+ private String appIds;
+
+ /**
+ * ack / response
+ */
+ private String type;
+
+
+ /**
+ * receive ack info
+ * @param state state
+ * @param startTime startTime
+ * @param workerAddress workerAddress
+ * @param executePath executePath
+ * @param logPath logPath
+ * @param taskInstanceId taskInstanceId
+ * @param type type
+ */
+ public void receiveAck(ExecutionStatus state,
+ Date startTime,
+ String workerAddress,
+ String executePath,
+ String logPath,
+ int taskInstanceId,
+ String type){
+ this.state = state;
+ this.startTime = startTime;
+ this.workerAddress = workerAddress;
+ this.executePath = executePath;
+ this.logPath = logPath;
+ this.taskInstanceId = taskInstanceId;
+ this.type = type;
+ }
+
+ /**
+ * receive response info
+ * @param state state
+ * @param endTime endTime
+ * @param processId processId
+ * @param appIds appIds
+ * @param taskInstanceId taskInstanceId
+ * @param type type
+ */
+ public void receiveResponse(ExecutionStatus state,
+ Date endTime,
+ int processId,
+ String appIds,
+ int taskInstanceId,
+ String type){
+ this.state = state;
+ this.endTime = endTime;
+ this.processId = processId;
+ this.appIds = appIds;
+ this.taskInstanceId = taskInstanceId;
+ this.type = type;
+ }
+
+ public int getTaskInstanceId() {
+ return taskInstanceId;
+ }
+
+ public void setTaskInstanceId(int taskInstanceId) {
+ this.taskInstanceId = taskInstanceId;
+ }
+
+ public String getWorkerAddress() {
+ return workerAddress;
+ }
+
+ public void setWorkerAddress(String workerAddress) {
+ this.workerAddress = workerAddress;
+ }
+
+ public ExecutionStatus getState() {
+ return state;
+ }
+
+ public void setState(ExecutionStatus state) {
+ this.state = state;
+ }
+
+ public Date getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Date startTime) {
+ this.startTime = startTime;
+ }
+
+ public Date getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(Date endTime) {
+ this.endTime = endTime;
+ }
+
+ public String getExecutePath() {
+ return executePath;
+ }
+
+ public void setExecutePath(String executePath) {
+ this.executePath = executePath;
+ }
+
+ public String getLogPath() {
+ return logPath;
+ }
+
+ public void setLogPath(String logPath) {
+ this.logPath = logPath;
+ }
+
+ public int getProcessId() {
+ return processId;
+ }
+
+ public void setProcessId(int processId) {
+ this.processId = processId;
+ }
+
+ public String getAppIds() {
+ return appIds;
+ }
+
+ public void setAppIds(String appIds) {
+ this.appIds = appIds;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ @Override
+ public String toString() {
+ return "TaskEvent{" +
+ "taskInstanceId=" + taskInstanceId +
+ ", workerAddress='" + workerAddress + '\'' +
+ ", state=" + state +
+ ", startTime=" + startTime +
+ ", endTime=" + endTime +
+ ", executePath='" + executePath + '\'' +
+ ", logPath='" + logPath + '\'' +
+ ", processId=" + processId +
+ ", appIds='" + appIds + '\'' +
+ ", type='" + type + '\'' +
+ '}';
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
new file mode 100644
index 0000000..a2710ee
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/manager/TaskManager.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.manager;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * task manager
+ */
+@Component
+public class TaskManager {
+
+ /**
+ * logger
+ */
+ private static final Logger logger = LoggerFactory.getLogger(TaskManager.class);
+
+ /**
+ * attemptQueue
+ */
+ private final BlockingQueue<TaskEvent> attemptQueue = new LinkedBlockingQueue<>(5000);
+
+
+ /**
+ * process service
+ */
+ @Autowired
+ private ProcessService processService;
+
+
+ @PostConstruct
+ public void init(){
+ TaskWorker taskWorker = new TaskWorker();
+ taskWorker.start();
+ }
+
+ /**
+ * put task to attemptQueue
+ *
+ * @param taskEvent taskEvent
+ */
+ public void putTask(TaskEvent taskEvent){
+ try {
+ attemptQueue.put(taskEvent);
+ } catch (InterruptedException e) {
+ logger.error("put task : {} error :{}",taskEvent,e);
+ }
+ }
+
+
+ /**
+ * task worker thread
+ */
+ class TaskWorker extends Thread {
+
+ @Override
+ public void run() {
+
+ while (Stopper.isRunning()){
+ try {
+ if (attemptQueue.size() == 0){
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ continue;
+ }
+ TaskEvent taskEvent = attemptQueue.take();
+
+ persist(taskEvent);
+
+ }catch (Exception e){
+ logger.error("persist task error",e);
+ }
+ }
+ }
+
+ /**
+ * persist taskEvent
+ * @param taskEvent taskEvent
+ */
+ private void persist(TaskEvent taskEvent){
+ if (TaskEvent.ACK.equals(taskEvent.getType())){
+ processService.changeTaskState(taskEvent.getState(),
+ taskEvent.getStartTime(),
+ taskEvent.getWorkerAddress(),
+ taskEvent.getExecutePath(),
+ taskEvent.getLogPath(),
+ taskEvent.getTaskInstanceId());
+ }else if (TaskEvent.RESPONSE.equals(taskEvent.getType())){
+ processService.changeTaskState(taskEvent.getState(),
+ taskEvent.getEndTime(),
+ taskEvent.getProcessId(),
+ taskEvent.getAppIds(),
+ taskEvent.getTaskInstanceId());
+ }
+ }
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
index 8f0b731..a678cad 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
@@ -28,6 +28,8 @@ import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.manager.TaskEvent;
+import org.apache.dolphinscheduler.server.master.manager.TaskManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@@ -43,7 +45,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
/**
* process service
*/
- private final ProcessService processService;
+ private final TaskManager taskManager;
/**
* taskInstance cache manager
@@ -51,7 +53,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskAckProcessor(){
- this.processService = SpringApplicationContext.getBean(ProcessService.class);
+ this.taskManager = SpringApplicationContext.getBean(TaskManager.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
@@ -69,15 +71,18 @@ public class TaskAckProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
- /**
- * change Task state
- */
- processService.changeTaskState(ExecutionStatus.of(taskAckCommand.getStatus()),
+
+ // TaskEvent
+ TaskEvent taskEvent = new TaskEvent();
+ taskEvent.receiveAck(ExecutionStatus.of(taskAckCommand.getStatus()),
taskAckCommand.getStartTime(),
workerAddress,
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
- taskAckCommand.getTaskInstanceId());
+ taskAckCommand.getTaskInstanceId(),
+ TaskEvent.ACK);
+
+ taskManager.putTask(taskEvent);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index 93ca4ab..ffc5d72 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
+import org.apache.dolphinscheduler.server.master.manager.TaskEvent;
+import org.apache.dolphinscheduler.server.master.manager.TaskManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
@@ -42,7 +44,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
/**
* process service
*/
- private final ProcessService processService;
+ private final TaskManager taskManager;
/**
* taskInstance cache manager
@@ -50,7 +52,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskResponseProcessor(){
- this.processService = SpringApplicationContext.getBean(ProcessService.class);
+ this.taskManager = SpringApplicationContext.getBean(TaskManager.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
}
@@ -70,11 +72,16 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
- processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
+ // TaskEvent
+ TaskEvent taskEvent = new TaskEvent();
+ taskEvent.receiveResponse(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
- responseCommand.getTaskInstanceId());
+ responseCommand.getTaskInstanceId(),
+ TaskEvent.RESPONSE);
+
+ taskManager.putTask(taskEvent);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index 80ba649..ed47613 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -24,6 +24,7 @@ import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
@@ -86,6 +87,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
+ taskExecutionContext.setHost(OSUtils.getHost() + ":" + workerConfig.getListenPort());
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);