You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/12/06 03:23:26 UTC
[dolphinscheduler] branch dev updated: Fix task log file might not be clear (#13102)
This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 8a152aebc7 Fix task log file might not be clear (#13102)
8a152aebc7 is described below
commit 8a152aebc70f57cc400c1cd8b16991817bed0d0e
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Dec 6 11:23:20 2022 +0800
Fix task log file might not be clear (#13102)
---
.../api/service/impl/ProcessInstanceServiceImpl.java | 11 ++++-------
.../dolphinscheduler/dao/mapper/TaskInstanceMapper.java | 2 ++
.../dolphinscheduler/dao/repository/TaskInstanceDao.java | 1 +
.../dao/repository/impl/TaskInstanceDaoImpl.java | 5 +++++
.../dolphinscheduler/dao/mapper/TaskInstanceMapper.xml | 6 ++++++
.../apache/dolphinscheduler/service/log/LogClient.java | 14 ++++++--------
.../service/process/ProcessServiceImpl.java | 15 +++++++++------
.../dolphinscheduler/service/log/LogClientTest.java | 2 +-
8 files changed, 34 insertions(+), 22 deletions(-)
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 5f46c761ac..be1ff200dd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -814,18 +814,15 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
}
- try {
- processService.removeTaskLogFile(processInstanceId);
- } catch (Exception ex) {
- // ignore
- logger.warn("Remove task log file exception, processInstanceId:{}.", processInstanceId, ex);
- }
-
// delete database cascade
int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
processService.deleteAllSubWorkProcessByParentId(processInstanceId);
processService.deleteWorkProcessMapByParentId(processInstanceId);
+ // We need to remove the task log file before deleting the task instance
+ // because the task log file is query from task instance.
+ // When delete task instance error, the task log file will also be deleted, this may cause data inconsistency.
+ processService.removeTaskLogFile(processInstanceId);
taskInstanceDao.deleteByWorkflowInstanceId(processInstanceId);
if (delete > 0) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index cd27167bfe..faa481e294 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -155,4 +155,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("status") int status);
void deleteByWorkflowInstanceId(@Param("workflowInstanceId") int workflowInstanceId);
+
+ List<TaskInstance> findByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index f132bbca64..4c16a56322 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -89,4 +89,5 @@ public interface TaskInstanceDao {
void deleteByWorkflowInstanceId(int workflowInstanceId);
+ List<TaskInstance> findTaskInstanceByWorkflowInstanceId(Integer processInstanceId);
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index 21058b6d95..aca18171b7 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -172,4 +172,9 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
taskInstanceMapper.deleteByWorkflowInstanceId(workflowInstanceId);
}
+ @Override
+ public List<TaskInstance> findTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId) {
+ return taskInstanceMapper.findByWorkflowInstanceId(workflowInstanceId);
+ }
+
}
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 3d988683ff..d3726ebf95 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -57,6 +57,12 @@
and test_flag=#{testFlag}
order by start_time desc
</select>
+ <select id="findByWorkflowInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_task_instance
+ WHERE process_instance_id = #{workflowInstanceId}
+ </select>
<select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
select
<include refid="baseSql"/>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
index 5eac21452d..f1f1e54a5f 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
@@ -176,17 +176,15 @@ public class LogClient implements AutoCloseable {
* remove task log
*
* @param host host
- * @param port port
* @param path path
* @return remove task status
*/
- public Boolean removeTaskLog(String host, int port, String path) {
- logger.info("Remove task log from host: {}, port: {}, logPath {}", host, port, path);
+ public Boolean removeTaskLog(@NonNull Host host, String path) {
+ logger.info("Remove task log from host: {} logPath {}", host, path);
RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
- final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
- Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
+ Command response = this.client.sendSync(host, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
RemoveTaskLogResponseCommand taskLogResponse =
JSONUtils.parseObject(response.getBody(), RemoveTaskLogResponseCommand.class);
@@ -196,11 +194,11 @@ public class LogClient implements AutoCloseable {
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.error(
- "Remove task log from host: {}, port: {} logPath: {} error, the current thread has been interrupted",
- host, port, path, ex);
+ "Remove task log from host: {}, logPath: {} error, the current thread has been interrupted",
+ host, path, ex);
return false;
} catch (Exception e) {
- logger.error("Remove task log from host: {}, port: {} logPath: {} error", host, port, path, e);
+ logger.error("Remove task log from host: {}, logPath: {} error", host, path, e);
return false;
}
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 226050dcb5..cc40fadcf7 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -510,9 +510,7 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
public void removeTaskLogFile(Integer processInstanceId) {
- ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
- List<TaskInstance> taskInstanceList =
- taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
+ List<TaskInstance> taskInstanceList = taskInstanceDao.findTaskInstanceByWorkflowInstanceId(processInstanceId);
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
@@ -521,9 +519,14 @@ public class ProcessServiceImpl implements ProcessService {
if (Strings.isNullOrEmpty(taskInstance.getHost())) {
continue;
}
- Host host = Host.of(taskInstance.getHost());
- // remove task log from loggerserver
- logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
+ try {
+ Host host = Host.of(taskInstance.getHost());
+ logClient.removeTaskLog(host, taskLogPath);
+ } catch (Exception e) {
+ logger.error(
+ "Remove task log error, meet an unknown exception, taskInstanceId: {}, host: {}, logPath: {}",
+ taskInstance.getId(), taskInstance.getHost(), taskInstance.getLogPath(), e);
+ }
}
}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
index 9ccfadd49d..faaad4fd7e 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
@@ -151,7 +151,7 @@ public class LogClientTest {
.thenReturn(command);
LogClient logClient = new LogClient();
- Boolean status = logClient.removeTaskLog("localhost", 1234, "/log/path");
+ Boolean status = logClient.removeTaskLog(Host.of("localhost:1234"), "/log/path");
Assertions.assertTrue(status);
}
}