You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/02/23 02:10:23 UTC
[incubator-dolphinscheduler] branch dev updated:
[Improvement-#3735] Make task delayed execution more efficient (#4812)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ffcb1c2 [Improvement-#3735] Make task delayed execution more efficient (#4812)
ffcb1c2 is described below
commit ffcb1c22e16107e387ceb21dc5a0faaef0783ae7
Author: guohaozhang <zh...@gmail.com>
AuthorDate: Tue Feb 23 10:10:11 2021 +0800
[Improvement-#3735] Make task delayed execution more efficient (#4812)
* [Improvement-3735] improve implementation of delay task execution
* [Improvement][worker] delay task compatible with dev branch and fix test
Co-authored-by: vanilla111 <11...@qq.com>
---
.../server/worker/WorkerServer.java | 7 +
.../worker/processor/TaskExecuteProcessor.java | 48 ++---
.../server/worker/processor/TaskKillProcessor.java | 8 +
.../server/worker/runner/TaskExecuteThread.java | 48 ++---
.../server/worker/runner/WorkerManagerThread.java | 143 +++++++++++++++
.../worker/processor/TaskCallbackServiceTest.java | 4 +-
.../worker/processor/TaskExecuteProcessorTest.java | 195 +++++++++++++++++++++
.../worker/runner/WorkerManagerThreadTest.java | 187 ++++++++++++++++++++
pom.xml | 2 +
9 files changed, 599 insertions(+), 43 deletions(-)
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 1f138f6..a267b5b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -84,6 +85,9 @@ public class WorkerServer {
@Autowired
private RetryReportTaskStatusThread retryReportTaskStatusThread;
+ @Autowired
+ private WorkerManagerThread workerManagerThread;
+
/**
* worker server startup
*
@@ -119,6 +123,9 @@ public class WorkerServer {
// worker registry
this.workerRegistry.registry();
+ // task execute manager
+ this.workerManagerThread.start();
+
// retry report task status
this.retryReportTaskStatusThread.start();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index fc8b33a..3088080 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
@@ -38,12 +38,12 @@ import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCache
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.Optional;
-import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,11 +58,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
/**
- * thread executor service
- */
- private final ExecutorService workerExecService;
-
- /**
* worker config
*/
private final WorkerConfig workerConfig;
@@ -73,20 +68,25 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private final TaskCallbackService taskCallbackService;
/**
- * alert client service
+ * alert client service
*/
private AlertClientService alertClientService;
/**
* taskExecutionContextCacheManager
*/
- private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
+ private final TaskExecutionContextCacheManager taskExecutionContextCacheManager;
+
+ /*
+ * task execute manager
+ */
+ private final WorkerManagerThread workerManager;
public TaskExecuteProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
- this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
+ this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class);
}
/**
@@ -101,11 +101,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
}
public TaskExecuteProcessor(AlertClientService alertClientService) {
- this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
- this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
- this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
- this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
-
+ this();
this.alertClientService = alertClientService;
}
@@ -140,9 +136,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.getTaskInstanceId()));
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
- taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
- taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
@@ -163,10 +157,23 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
+ // delay task process
+ long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
+ if (remainTime > 0) {
+ logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
+ taskExecutionContext.setStartTime(null);
+ } else {
+ taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
+ taskExecutionContext.setStartTime(new Date());
+ }
+
this.doAck(taskExecutionContext);
- // submit task
- workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService));
+ // submit task to manager
+ if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))) {
+ logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getQueueSize());
+ }
}
private void doAck(TaskExecutionContext taskExecutionContext) {
@@ -178,6 +185,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
/**
* build ack command
+ *
* @param taskExecutionContext taskExecutionContext
* @return TaskExecuteAckCommand
*/
@@ -209,4 +217,4 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}
-}
\ No newline at end of file
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
index b41c189..a3665b3 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
@@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
@@ -69,10 +70,16 @@ public class TaskKillProcessor implements NettyRequestProcessor {
*/
private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
+ /*
+ * task execute manager
+ */
+ private final WorkerManagerThread workerManager;
+
public TaskKillProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
+ this.workerManager = SpringApplicationContext.getBean(WorkerManagerThread.class);
}
/**
@@ -110,6 +117,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
Integer processId = taskExecutionContext.getProcessId();
if (processId.equals(0)) {
+ workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
taskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return Pair.of(true, appIds);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 7216567..05bd806 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.runner;
-import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -51,7 +50,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -62,7 +63,7 @@ import com.github.rholder.retry.RetryException;
/**
* task scheduler thread
*/
-public class TaskExecuteThread implements Runnable {
+public class TaskExecuteThread implements Runnable, Delayed {
/**
* logger
@@ -132,7 +133,6 @@ public class TaskExecuteThread implements Runnable {
// task node
TaskNode taskNode = JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
- delayExecutionIfNeeded();
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
@@ -290,24 +290,6 @@ public class TaskExecuteThread implements Runnable {
}
/**
- * delay execution if needed.
- */
- private void delayExecutionIfNeeded() {
- long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
- taskExecutionContext.getDelayTime() * 60L);
- logger.info("delay execution time: {} s", remainTime < 0 ? 0 : remainTime);
- if (remainTime > 0) {
- try {
- Thread.sleep(remainTime * Constants.SLEEP_TIME_MILLIS);
- } catch (Exception e) {
- logger.error("delay task execution failure, the task will be executed directly. process instance id:{}, task instance id:{}",
- taskExecutionContext.getProcessInstanceId(),
- taskExecutionContext.getTaskInstanceId());
- }
- }
- }
-
- /**
* send an ack to change the status of the task.
*/
private void changeTaskExecutionStatusToRunning() {
@@ -343,4 +325,26 @@ public class TaskExecuteThread implements Runnable {
}
return ackCommand;
}
-}
\ No newline at end of file
+
+ /**
+ * get current TaskExecutionContext
+ * @return TaskExecutionContext
+ */
+ public TaskExecutionContext getTaskExecutionContext() {
+ return this.taskExecutionContext;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
+ taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ if (o == null) {
+ return 1;
+ }
+ return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
new file mode 100644
index 0000000..073c948
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.enums.Event;
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * Manage tasks
+ */
+@Component
+public class WorkerManagerThread implements Runnable {
+
+ private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
+
+ /**
+ * task queue
+ */
+ private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();
+
+ /**
+ * worker config
+ */
+ private final WorkerConfig workerConfig;
+
+ /**
+ * thread executor service
+ */
+ private final ExecutorService workerExecService;
+
+ /**
+ * taskExecutionContextCacheManager
+ */
+ private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
+
+ /**
+ * task callback service
+ */
+ private final TaskCallbackService taskCallbackService;
+
+ public WorkerManagerThread() {
+ this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
+ this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
+ this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads());
+ this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
+ }
+
+ /**
+ * get queue size
+ *
+ * @return queue size
+ */
+ public int getQueueSize() {
+ return workerExecuteQueue.size();
+ }
+
+ /**
+ * Kill tasks that have not been executed, like delay task
+ * then send Response to Master, update the execution status of task instance
+ */
+ public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
+ workerExecuteQueue.stream()
+ .filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext().getTaskInstanceId() == taskInstanceId)
+ .forEach(workerExecuteQueue::remove);
+ sendTaskKillResponse(taskInstanceId);
+ }
+
+ /**
+ * kill task before execute , like delay task
+ */
+ private void sendTaskKillResponse(Integer taskInstanceId) {
+ TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
+ if (taskExecutionContext == null) {
+ return;
+ }
+ TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
+ responseCommand.setStatus(ExecutionStatus.KILL.getCode());
+ ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
+ taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
+ }
+
+ /**
+ * submit task
+ *
+ * @param taskExecuteThread taskExecuteThread
+ * @return submit result
+ */
+ public boolean offer(TaskExecuteThread taskExecuteThread) {
+ return workerExecuteQueue.offer(taskExecuteThread);
+ }
+
+ public void start() {
+ Thread thread = new Thread(this, this.getClass().getName());
+ thread.start();
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("Worker-Execute-Manager-Thread");
+ TaskExecuteThread taskExecuteThread;
+ while (Stopper.isRunning()) {
+ try {
+ taskExecuteThread = workerExecuteQueue.take();
+ workerExecService.submit(taskExecuteThread);
+ } catch (Exception e) {
+ logger.error("An unexpected interrupt is happened, "
+ + "the exception will be ignored and this thread will continue to run", e);
+ }
+ }
+ }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 5a5561d..bdd723a 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
@@ -79,7 +80,8 @@ import io.netty.channel.Channel;
TaskResponseProcessor.class,
TaskExecuteProcessor.class,
CuratorZookeeperClient.class,
- TaskExecutionContextCacheManagerImpl.class})
+ TaskExecutionContextCacheManagerImpl.class,
+ WorkerManagerThread.class})
public class TaskCallbackServiceTest {
@Autowired
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
new file mode 100644
index 0000000..fef2151
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.Date;
+import java.util.concurrent.ExecutorService;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * test task execute processor
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
+ JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
+public class TaskExecuteProcessorTest {
+
+ private TaskExecutionContext taskExecutionContext;
+
+ private TaskCallbackService taskCallbackService;
+
+ private ExecutorService workerExecService;
+
+ private WorkerConfig workerConfig;
+
+ private Command command;
+
+ private Command ackCommand;
+
+ private TaskExecuteRequestCommand taskRequestCommand;
+
+ private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
+
+ private AlertClientService alertClientService;
+
+ private WorkerManagerThread workerManager;
+
+ @Before
+ public void before() throws Exception {
+ // init task execution context
+ taskExecutionContext = getTaskExecutionContext();
+ workerConfig = new WorkerConfig();
+ workerConfig.setWorkerExecThreads(1);
+ workerConfig.setListenPort(1234);
+ command = new Command();
+ command.setType(CommandType.TASK_EXECUTE_REQUEST);
+ ackCommand = new TaskExecuteAckCommand().convert2Command();
+ taskRequestCommand = new TaskExecuteRequestCommand();
+ alertClientService = PowerMockito.mock(AlertClientService.class);
+ workerExecService = PowerMockito.mock(ExecutorService.class);
+ PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
+ .thenReturn(null);
+
+ PowerMockito.mockStatic(ChannelUtils.class);
+ PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
+
+ taskExecutionContextCacheManager = PowerMockito.mock(TaskExecutionContextCacheManagerImpl.class);
+ taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
+ PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
+
+ PowerMockito.mockStatic(SpringApplicationContext.class);
+ PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
+ .thenReturn(taskCallbackService);
+ PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
+ .thenReturn(workerConfig);
+ PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
+ .thenReturn(null);
+ PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
+ .thenReturn(taskExecutionContextCacheManager);
+
+ Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+ taskExecutionContext.getProcessDefineId(),
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId()));
+
+ workerManager = PowerMockito.mock(WorkerManagerThread.class);
+ PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService))).thenReturn(Boolean.TRUE);
+
+ PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class))
+ .thenReturn(workerManager);
+
+ PowerMockito.mockStatic(ThreadUtils.class);
+ PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()))
+ .thenReturn(workerExecService);
+
+ PowerMockito.mockStatic(JsonSerializer.class);
+ PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskExecuteRequestCommand.class))
+ .thenReturn(taskRequestCommand);
+
+ PowerMockito.mockStatic(JSONUtils.class);
+ PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
+ .thenReturn(taskRequestCommand);
+ PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class))
+ .thenReturn(taskExecutionContext);
+
+ PowerMockito.mockStatic(FileUtils.class);
+ PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
+ taskExecutionContext.getProcessDefineId(),
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId()))
+ .thenReturn(taskExecutionContext.getExecutePath());
+ PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
+
+ SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(null, null, null, alertClientService);
+ PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments()
+ .thenReturn(simpleTaskExecuteThread);
+ }
+
+ @Test
+ public void testNormalExecution() {
+ TaskExecuteProcessor processor = new TaskExecuteProcessor();
+ processor.process(null, command);
+
+ Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
+ }
+
+ @Test
+ public void testDelayExecution() {
+ taskExecutionContext.setDelayTime(1);
+ TaskExecuteProcessor processor = new TaskExecuteProcessor();
+ processor.process(null, command);
+
+ Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
+ }
+
+ public TaskExecutionContext getTaskExecutionContext() {
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setProcessId(12345);
+ taskExecutionContext.setProcessDefineId(1);
+ taskExecutionContext.setProcessInstanceId(1);
+ taskExecutionContext.setTaskInstanceId(1);
+ taskExecutionContext.setTaskType("sql");
+ taskExecutionContext.setFirstSubmitTime(new Date());
+ taskExecutionContext.setDelayTime(0);
+ taskExecutionContext.setLogPath("/tmp/test.log");
+ taskExecutionContext.setHost("localhost");
+ taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
+ return taskExecutionContext;
+ }
+
+ private static class SimpleTaskExecuteThread extends TaskExecuteThread {
+
+ public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, Logger taskLogger, AlertClientService alertClientService) {
+ super(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
+ }
+
+ @Override
+ public void run() {
+ //
+ }
+ }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java
new file mode 100644
index 0000000..c6b0493
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThreadTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.runner;
+
+import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
+import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
+import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
+import org.apache.dolphinscheduler.server.worker.task.TaskManager;
+import org.apache.dolphinscheduler.service.alert.AlertClientService;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * test worker manager thread.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({
+ Stopper.class,
+ TaskManager.class,
+ JSONUtils.class,
+ CommonUtils.class,
+ SpringApplicationContext.class,
+ OSUtils.class})
+public class WorkerManagerThreadTest {
+
+ private TaskCallbackService taskCallbackService;
+
+ private WorkerManagerThread workerManager;
+
+ private TaskExecutionContext taskExecutionContext;
+
+ private AlertClientService alertClientService;
+
+ private Logger taskLogger;
+
+ @Before
+ public void before() {
+ // init task execution context, logger
+ taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setProcessId(12345);
+ taskExecutionContext.setProcessDefineId(1);
+ taskExecutionContext.setProcessInstanceId(1);
+ taskExecutionContext.setTaskInstanceId(1);
+ taskExecutionContext.setTenantCode("test");
+ taskExecutionContext.setTaskType("");
+ taskExecutionContext.setFirstSubmitTime(new Date());
+ taskExecutionContext.setDelayTime(0);
+ taskExecutionContext.setLogPath("/tmp/test.log");
+ taskExecutionContext.setHost("localhost");
+ taskExecutionContext.setExecutePath("/tmp/dolphinscheduler/exec/process/1/2/3/4");
+
+ Command ackCommand = new TaskExecuteAckCommand().convert2Command();
+ Command responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()).convert2Command();
+
+ taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(
+ LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+ taskExecutionContext.getProcessDefineId(),
+ taskExecutionContext.getProcessInstanceId(),
+ taskExecutionContext.getTaskInstanceId()
+ ));
+
+ TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
+ taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
+
+ alertClientService = PowerMockito.mock(AlertClientService.class);
+ WorkerConfig workerConfig = PowerMockito.mock(WorkerConfig.class);
+ taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
+ PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
+ PowerMockito.doNothing().when(taskCallbackService).sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
+ PowerMockito.mockStatic(SpringApplicationContext.class);
+ PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
+ .thenReturn(taskExecutionContextCacheManager);
+ PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
+ .thenReturn(workerConfig);
+ PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
+ .thenReturn(taskCallbackService);
+ PowerMockito.when(workerConfig.getWorkerExecThreads()).thenReturn(5);
+ workerManager = new WorkerManagerThread();
+
+ PowerMockito.mockStatic(TaskManager.class);
+ PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService))
+ .thenReturn(new SimpleTask(taskExecutionContext, taskLogger));
+ PowerMockito.mockStatic(JSONUtils.class);
+ PowerMockito.when(JSONUtils.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class))
+ .thenReturn(new TaskNode());
+ PowerMockito.mockStatic(CommonUtils.class);
+ PowerMockito.when(CommonUtils.getSystemEnvPath()).thenReturn("/user_home/.bash_profile");
+ List<String> osUserList = Collections.singletonList("test");
+ PowerMockito.mockStatic(OSUtils.class);
+ PowerMockito.when(OSUtils.getUserList()).thenReturn(osUserList);
+ PowerMockito.mockStatic(Stopper.class);
+ PowerMockito.when(Stopper.isRunning()).thenReturn(true, false);
+ }
+
+ @Test
+ public void testSendTaskKillResponse() {
+ TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
+ workerManager.offer(taskExecuteThread);
+ Assert.assertEquals(1, workerManager.getQueueSize());
+ workerManager.killTaskBeforeExecuteByInstanceId(1);
+ Assert.assertEquals(0, workerManager.getQueueSize());
+ }
+
+ @Test
+ public void testRun() {
+ TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
+ workerManager.offer(taskExecuteThread);
+ Assert.assertEquals(1, workerManager.getQueueSize());
+ workerManager.run();
+ Assert.assertEquals(0, workerManager.getQueueSize());
+ }
+
+ private static class SimpleTask extends AbstractTask {
+
+ protected SimpleTask(TaskExecutionContext taskExecutionContext, Logger logger) {
+ super(taskExecutionContext, logger);
+ // pid
+ this.processId = taskExecutionContext.getProcessId();
+ }
+
+ @Override
+ public AbstractParameters getParameters() {
+ return null;
+ }
+
+ @Override
+ public void init() {
+
+ }
+
+ @Override
+ public void handle() {
+
+ }
+
+ @Override
+ public void after() {
+
+ }
+
+ @Override
+ public ExecutionStatus getExitStatus() {
+ return ExecutionStatus.SUCCESS;
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index c0974a5..c6880c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -913,6 +913,7 @@
<include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>
<include>**/server/worker/processor/TaskCallbackServiceTest.java</include>
+ <include>**/server/worker/processor/TaskExecuteProcessorTest.java</include>
<include>**/server/worker/registry/WorkerRegistryTest.java</include>
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include>
@@ -926,6 +927,7 @@
<include>**/server/worker/task/TaskManagerTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
+ <include>**/server/worker/runner/WorkerManagerThreadTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/process/ProcessServiceTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>