You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/03/21 07:54:57 UTC

[GitHub] [incubator-inlong] gong commented on a change in pull request #3265: [INLONG-3264][Manager] Fix deadlock in stream source

gong commented on a change in pull request #3265:
URL: https://github.com/apache/incubator-inlong/pull/3265#discussion_r830831666



##########
File path: inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
##########
@@ -126,68 +127,79 @@ public TaskResult reportAndGetTask(TaskRequest request) {
     /**
      * Update the task status by the request
      */
-    private void updateTaskStatus(TaskRequest request) {
+    protected void updateTaskStatus(TaskRequest request) {
         if (CollectionUtils.isEmpty(request.getCommandInfo())) {
             LOGGER.warn("task result was empty, just return");
             return;
         }
 
         for (CommandEntity command : request.getCommandInfo()) {
-            Integer taskId = command.getTaskId();
-            StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
-            if (current == null) {
-                continue;
-            }
+            updateCommandEntity(command);
+            // Other tasks with status 20x will change to 30x in next getTaskResult method
+        }
+    }
 
-            LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
-            Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
-            if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
-                LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
-                continue;
-            }
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    protected void updateCommandEntity(CommandEntity command) {
+        Integer taskId = command.getTaskId();

Review comment:
       It not will generate proxy transactional when directly invoke function `updateCommandEntity`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org