You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ca...@apache.org on 2022/12/06 03:23:26 UTC

[dolphinscheduler] branch dev updated: Fix task log file might not be clear (#13102)

This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8a152aebc7 Fix task log file might not be clear (#13102)
8a152aebc7 is described below

commit 8a152aebc70f57cc400c1cd8b16991817bed0d0e
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Dec 6 11:23:20 2022 +0800

    Fix task log file might not be clear (#13102)
---
 .../api/service/impl/ProcessInstanceServiceImpl.java      | 11 ++++-------
 .../dolphinscheduler/dao/mapper/TaskInstanceMapper.java   |  2 ++
 .../dolphinscheduler/dao/repository/TaskInstanceDao.java  |  1 +
 .../dao/repository/impl/TaskInstanceDaoImpl.java          |  5 +++++
 .../dolphinscheduler/dao/mapper/TaskInstanceMapper.xml    |  6 ++++++
 .../apache/dolphinscheduler/service/log/LogClient.java    | 14 ++++++--------
 .../service/process/ProcessServiceImpl.java               | 15 +++++++++------
 .../dolphinscheduler/service/log/LogClientTest.java       |  2 +-
 8 files changed, 34 insertions(+), 22 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 5f46c761ac..be1ff200dd 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -814,18 +814,15 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
             throw new ServiceException(PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
         }
 
-        try {
-            processService.removeTaskLogFile(processInstanceId);
-        } catch (Exception ex) {
-            // ignore
-            logger.warn("Remove task log file exception, processInstanceId:{}.", processInstanceId, ex);
-        }
-
         // delete database cascade
         int delete = processService.deleteWorkProcessInstanceById(processInstanceId);
 
         processService.deleteAllSubWorkProcessByParentId(processInstanceId);
         processService.deleteWorkProcessMapByParentId(processInstanceId);
+        // We need to remove the task log file before deleting the task instance
+        // because the task log file is query from task instance.
+        // When delete task instance error, the task log file will also be deleted, this may cause data inconsistency.
+        processService.removeTaskLogFile(processInstanceId);
         taskInstanceDao.deleteByWorkflowInstanceId(processInstanceId);
 
         if (delete > 0) {
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index cd27167bfe..faa481e294 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -155,4 +155,6 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
                                              @Param("status") int status);
 
     void deleteByWorkflowInstanceId(@Param("workflowInstanceId") int workflowInstanceId);
+
+    List<TaskInstance> findByWorkflowInstanceId(@Param("workflowInstanceId") Integer workflowInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
index f132bbca64..4c16a56322 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java
@@ -89,4 +89,5 @@ public interface TaskInstanceDao {
 
     void deleteByWorkflowInstanceId(int workflowInstanceId);
 
+    List<TaskInstance> findTaskInstanceByWorkflowInstanceId(Integer processInstanceId);
 }
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
index 21058b6d95..aca18171b7 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java
@@ -172,4 +172,9 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao {
         taskInstanceMapper.deleteByWorkflowInstanceId(workflowInstanceId);
     }
 
+    @Override
+    public List<TaskInstance> findTaskInstanceByWorkflowInstanceId(Integer workflowInstanceId) {
+        return taskInstanceMapper.findByWorkflowInstanceId(workflowInstanceId);
+    }
+
 }
diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index 3d988683ff..d3726ebf95 100644
--- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -57,6 +57,12 @@
         and test_flag=#{testFlag}
         order by start_time desc
     </select>
+    <select id="findByWorkflowInstanceId" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
+        select
+        <include refid="baseSql"/>
+        from t_ds_task_instance
+        WHERE process_instance_id = #{workflowInstanceId}
+    </select>
     <select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
         select
         <include refid="baseSql"/>
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
index 5eac21452d..f1f1e54a5f 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
@@ -176,17 +176,15 @@ public class LogClient implements AutoCloseable {
      * remove task log
      *
      * @param host host
-     * @param port port
      * @param path path
      * @return remove task status
      */
-    public Boolean removeTaskLog(String host, int port, String path) {
-        logger.info("Remove task log from host: {}, port: {}, logPath {}", host, port, path);
+    public Boolean removeTaskLog(@NonNull Host host, String path) {
+        logger.info("Remove task log from host: {} logPath {}", host, path);
         RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
-        final Host address = new Host(host, port);
         try {
             Command command = request.convert2Command();
-            Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
+            Command response = this.client.sendSync(host, command, LOG_REQUEST_TIMEOUT);
             if (response != null) {
                 RemoveTaskLogResponseCommand taskLogResponse =
                         JSONUtils.parseObject(response.getBody(), RemoveTaskLogResponseCommand.class);
@@ -196,11 +194,11 @@ public class LogClient implements AutoCloseable {
         } catch (InterruptedException ex) {
             Thread.currentThread().interrupt();
             logger.error(
-                    "Remove task log from host: {}, port: {} logPath: {} error, the current thread has been interrupted",
-                    host, port, path, ex);
+                    "Remove task log from host: {}, logPath: {} error, the current thread has been interrupted",
+                    host, path, ex);
             return false;
         } catch (Exception e) {
-            logger.error("Remove task log from host: {}, port: {} logPath: {} error", host, port, path, e);
+            logger.error("Remove task log from host: {},  logPath: {} error", host, path, e);
             return false;
         }
     }
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 226050dcb5..cc40fadcf7 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -510,9 +510,7 @@ public class ProcessServiceImpl implements ProcessService {
      */
     @Override
     public void removeTaskLogFile(Integer processInstanceId) {
-        ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
-        List<TaskInstance> taskInstanceList =
-                taskInstanceDao.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
+        List<TaskInstance> taskInstanceList = taskInstanceDao.findTaskInstanceByWorkflowInstanceId(processInstanceId);
         if (CollectionUtils.isEmpty(taskInstanceList)) {
             return;
         }
@@ -521,9 +519,14 @@ public class ProcessServiceImpl implements ProcessService {
             if (Strings.isNullOrEmpty(taskInstance.getHost())) {
                 continue;
             }
-            Host host = Host.of(taskInstance.getHost());
-            // remove task log from loggerserver
-            logClient.removeTaskLog(host.getIp(), host.getPort(), taskLogPath);
+            try {
+                Host host = Host.of(taskInstance.getHost());
+                logClient.removeTaskLog(host, taskLogPath);
+            } catch (Exception e) {
+                logger.error(
+                        "Remove task log error, meet an unknown exception, taskInstanceId: {}, host: {}, logPath: {}",
+                        taskInstance.getId(), taskInstance.getHost(), taskInstance.getLogPath(), e);
+            }
         }
     }
 
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
index 9ccfadd49d..faaad4fd7e 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java
@@ -151,7 +151,7 @@ public class LogClientTest {
                     .thenReturn(command);
 
             LogClient logClient = new LogClient();
-            Boolean status = logClient.removeTaskLog("localhost", 1234, "/log/path");
+            Boolean status = logClient.removeTaskLog(Host.of("localhost:1234"), "/log/path");
             Assertions.assertTrue(status);
         }
     }