You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/03/21 09:50:02 UTC

[GitHub] [incubator-inlong] healchow commented on a change in pull request #3265: [INLONG-3264][Manager] Fix deadlock in stream source

healchow commented on a change in pull request #3265:
URL: https://github.com/apache/incubator-inlong/pull/3265#discussion_r830916787



##########
File path: inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
##########
@@ -110,84 +112,88 @@ public Boolean reportSnapshot(TaskSnapshotRequest request) {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
-    public TaskResult reportAndGetTask(TaskRequest request) {
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    public void report(TaskRequest request) {
         LOGGER.info("begin to get agent task: {}", request);
-        if (request == null || request.getAgentIp() == null) {
+        if (request == null) {
             LOGGER.warn("agent request was empty, just return");
-            return null;
+            return;
         }
-
-        this.updateTaskStatus(request);
-
-        return this.getTaskResult(request);
-    }
-
-    /**
-     * Update the task status by the request
-     */
-    private void updateTaskStatus(TaskRequest request) {
         if (CollectionUtils.isEmpty(request.getCommandInfo())) {
             LOGGER.warn("task result was empty, just return");
             return;
         }
-
         for (CommandEntity command : request.getCommandInfo()) {
-            Integer taskId = command.getTaskId();
-            StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
-            if (current == null) {
-                continue;
-            }
+            updateCommandEntity(command);
+            // Other tasks with status 20x will change to 30x in next getTaskResult method
+        }
+    }
 
-            LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
-            Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
-            if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
-                LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
-                continue;
-            }
+    public void updateCommandEntity(CommandEntity command) {
+        Integer taskId = command.getTaskId();
+        StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
+        if (current == null) {
+            return;
+        }
 
-            int result = command.getCommandResult();
-            int previousStatus = current.getStatus();
-            int nextStatus = SourceState.SOURCE_NORMAL.getCode();
-            // Change the status from 30x to normal / disable / frozen
-            if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
-                if (Constants.RESULT_SUCCESS == result) {
-                    if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
-                        nextStatus = SourceState.SOURCE_NORMAL.getCode();
-                    } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
-                        nextStatus = SourceState.SOURCE_DISABLE.getCode();
-                    } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
-                        nextStatus = SourceState.SOURCE_FROZEN.getCode();
-                    }
-                } else if (Constants.RESULT_FAIL == result) {
-                    nextStatus = SourceState.SOURCE_FAILED.getCode();
-                }
+        LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
+        Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
+        if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
+            LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
+            return;
+        }
 
-                sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
-                LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
+        int result = command.getCommandResult();
+        int previousStatus = current.getStatus();
+        int nextStatus = SourceState.SOURCE_NORMAL.getCode();
+        // Change the status from 30x to normal / disable / frozen
+        if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
+            if (Constants.RESULT_SUCCESS == result) {
+                if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
+                    nextStatus = SourceState.SOURCE_NORMAL.getCode();
+                } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
+                    nextStatus = SourceState.SOURCE_DISABLE.getCode();
+                } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
+                    nextStatus = SourceState.SOURCE_FROZEN.getCode();
+                }
+            } else if (Constants.RESULT_FAIL == result) {
+                nextStatus = SourceState.SOURCE_FAILED.getCode();
             }
-            // Other tasks with status 20x will change to 30x in next getTaskResult method
+
+            sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
+            LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
         }
     }
 
     /**
      * Get task result by the request
      */
-    private TaskResult getTaskResult(TaskRequest request) {
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    public TaskResult getTaskResult(TaskRequest request) {
+        if (request == null) {

Review comment:
       Add check if `request.getAgentIp()` is empty.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org