You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/03/21 07:38:07 UTC

[GitHub] [incubator-inlong] kipshi opened a new pull request #3265: [INLONG-3264]Fix deadlock in stream source

kipshi opened a new pull request #3265:
URL: https://github.com/apache/incubator-inlong/pull/3265


   ### Title Name: [INLONG-3264]Fix deadlock in stream source
   
   where *XYZ* should be replaced by the actual issue number.
   
   Fixes #3264 
   
   ### Motivation
   
   ### Modifications
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] gong commented on a change in pull request #3265: [INLONG-3264][Manager] Fix deadlock in stream source

Posted by GitBox <gi...@apache.org>.
gong commented on a change in pull request #3265:
URL: https://github.com/apache/incubator-inlong/pull/3265#discussion_r830831666



##########
File path: inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
##########
@@ -126,68 +127,79 @@ public TaskResult reportAndGetTask(TaskRequest request) {
     /**
      * Update the task status by the request
      */
-    private void updateTaskStatus(TaskRequest request) {
+    protected void updateTaskStatus(TaskRequest request) {
         if (CollectionUtils.isEmpty(request.getCommandInfo())) {
             LOGGER.warn("task result was empty, just return");
             return;
         }
 
         for (CommandEntity command : request.getCommandInfo()) {
-            Integer taskId = command.getTaskId();
-            StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
-            if (current == null) {
-                continue;
-            }
+            updateCommandEntity(command);
+            // Other tasks with status 20x will change to 30x in next getTaskResult method
+        }
+    }
 
-            LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
-            Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
-            if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
-                LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
-                continue;
-            }
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    protected void updateCommandEntity(CommandEntity command) {
+        Integer taskId = command.getTaskId();

Review comment:
       It will not generate proxy transactional when directly invoke function `updateCommandEntity`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] healchow commented on a change in pull request #3265: [INLONG-3264][Manager] Fix deadlock in stream source

Posted by GitBox <gi...@apache.org>.
healchow commented on a change in pull request #3265:
URL: https://github.com/apache/incubator-inlong/pull/3265#discussion_r830916787



##########
File path: inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
##########
@@ -110,84 +112,88 @@ public Boolean reportSnapshot(TaskSnapshotRequest request) {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
-    public TaskResult reportAndGetTask(TaskRequest request) {
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    public void report(TaskRequest request) {
         LOGGER.info("begin to get agent task: {}", request);
-        if (request == null || request.getAgentIp() == null) {
+        if (request == null) {
             LOGGER.warn("agent request was empty, just return");
-            return null;
+            return;
         }
-
-        this.updateTaskStatus(request);
-
-        return this.getTaskResult(request);
-    }
-
-    /**
-     * Update the task status by the request
-     */
-    private void updateTaskStatus(TaskRequest request) {
         if (CollectionUtils.isEmpty(request.getCommandInfo())) {
             LOGGER.warn("task result was empty, just return");
             return;
         }
-
         for (CommandEntity command : request.getCommandInfo()) {
-            Integer taskId = command.getTaskId();
-            StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
-            if (current == null) {
-                continue;
-            }
+            updateCommandEntity(command);
+            // Other tasks with status 20x will change to 30x in next getTaskResult method
+        }
+    }
 
-            LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
-            Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
-            if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
-                LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
-                continue;
-            }
+    public void updateCommandEntity(CommandEntity command) {
+        Integer taskId = command.getTaskId();
+        StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
+        if (current == null) {
+            return;
+        }
 
-            int result = command.getCommandResult();
-            int previousStatus = current.getStatus();
-            int nextStatus = SourceState.SOURCE_NORMAL.getCode();
-            // Change the status from 30x to normal / disable / frozen
-            if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
-                if (Constants.RESULT_SUCCESS == result) {
-                    if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
-                        nextStatus = SourceState.SOURCE_NORMAL.getCode();
-                    } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
-                        nextStatus = SourceState.SOURCE_DISABLE.getCode();
-                    } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
-                        nextStatus = SourceState.SOURCE_FROZEN.getCode();
-                    }
-                } else if (Constants.RESULT_FAIL == result) {
-                    nextStatus = SourceState.SOURCE_FAILED.getCode();
-                }
+        LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
+        Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
+        if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
+            LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
+            return;
+        }
 
-                sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
-                LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
+        int result = command.getCommandResult();
+        int previousStatus = current.getStatus();
+        int nextStatus = SourceState.SOURCE_NORMAL.getCode();
+        // Change the status from 30x to normal / disable / frozen
+        if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
+            if (Constants.RESULT_SUCCESS == result) {
+                if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
+                    nextStatus = SourceState.SOURCE_NORMAL.getCode();
+                } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
+                    nextStatus = SourceState.SOURCE_DISABLE.getCode();
+                } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
+                    nextStatus = SourceState.SOURCE_FROZEN.getCode();
+                }
+            } else if (Constants.RESULT_FAIL == result) {
+                nextStatus = SourceState.SOURCE_FAILED.getCode();
             }
-            // Other tasks with status 20x will change to 30x in next getTaskResult method
+
+            sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
+            LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
         }
     }
 
     /**
      * Get task result by the request
      */
-    private TaskResult getTaskResult(TaskRequest request) {
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    public TaskResult getTaskResult(TaskRequest request) {
+        if (request == null) {

Review comment:
       If `request.getAgentIp()` is empty, an exception should be thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] gong commented on a change in pull request #3265: [INLONG-3264][Manager] Fix deadlock in stream source

Posted by GitBox <gi...@apache.org>.
gong commented on a change in pull request #3265:
URL: https://github.com/apache/incubator-inlong/pull/3265#discussion_r830831666



##########
File path: inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
##########
@@ -126,68 +127,79 @@ public TaskResult reportAndGetTask(TaskRequest request) {
     /**
      * Update the task status by the request
      */
-    private void updateTaskStatus(TaskRequest request) {
+    protected void updateTaskStatus(TaskRequest request) {
         if (CollectionUtils.isEmpty(request.getCommandInfo())) {
             LOGGER.warn("task result was empty, just return");
             return;
         }
 
         for (CommandEntity command : request.getCommandInfo()) {
-            Integer taskId = command.getTaskId();
-            StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
-            if (current == null) {
-                continue;
-            }
+            updateCommandEntity(command);
+            // Other tasks with status 20x will change to 30x in next getTaskResult method
+        }
+    }
 
-            LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
-            Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
-            if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
-                LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
-                continue;
-            }
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    protected void updateCommandEntity(CommandEntity command) {
+        Integer taskId = command.getTaskId();

Review comment:
       It not will generate proxy transactional when directly invoke function `updateCommandEntity`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] healchow merged pull request #3265: [INLONG-3264][Manager] Fix deadlock in stream source

Posted by GitBox <gi...@apache.org>.
healchow merged pull request #3265:
URL: https://github.com/apache/incubator-inlong/pull/3265


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] healchow commented on a change in pull request #3265: [INLONG-3264][Manager] Fix deadlock in stream source

Posted by GitBox <gi...@apache.org>.
healchow commented on a change in pull request #3265:
URL: https://github.com/apache/incubator-inlong/pull/3265#discussion_r830916787



##########
File path: inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
##########
@@ -110,84 +112,88 @@ public Boolean reportSnapshot(TaskSnapshotRequest request) {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
-    public TaskResult reportAndGetTask(TaskRequest request) {
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    public void report(TaskRequest request) {
         LOGGER.info("begin to get agent task: {}", request);
-        if (request == null || request.getAgentIp() == null) {
+        if (request == null) {
             LOGGER.warn("agent request was empty, just return");
-            return null;
+            return;
         }
-
-        this.updateTaskStatus(request);
-
-        return this.getTaskResult(request);
-    }
-
-    /**
-     * Update the task status by the request
-     */
-    private void updateTaskStatus(TaskRequest request) {
         if (CollectionUtils.isEmpty(request.getCommandInfo())) {
             LOGGER.warn("task result was empty, just return");
             return;
         }
-
         for (CommandEntity command : request.getCommandInfo()) {
-            Integer taskId = command.getTaskId();
-            StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
-            if (current == null) {
-                continue;
-            }
+            updateCommandEntity(command);
+            // Other tasks with status 20x will change to 30x in next getTaskResult method
+        }
+    }
 
-            LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
-            Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
-            if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
-                LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
-                continue;
-            }
+    public void updateCommandEntity(CommandEntity command) {
+        Integer taskId = command.getTaskId();
+        StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
+        if (current == null) {
+            return;
+        }
 
-            int result = command.getCommandResult();
-            int previousStatus = current.getStatus();
-            int nextStatus = SourceState.SOURCE_NORMAL.getCode();
-            // Change the status from 30x to normal / disable / frozen
-            if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
-                if (Constants.RESULT_SUCCESS == result) {
-                    if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
-                        nextStatus = SourceState.SOURCE_NORMAL.getCode();
-                    } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
-                        nextStatus = SourceState.SOURCE_DISABLE.getCode();
-                    } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
-                        nextStatus = SourceState.SOURCE_FROZEN.getCode();
-                    }
-                } else if (Constants.RESULT_FAIL == result) {
-                    nextStatus = SourceState.SOURCE_FAILED.getCode();
-                }
+        LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
+        Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
+        if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
+            LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
+            return;
+        }
 
-                sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
-                LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
+        int result = command.getCommandResult();
+        int previousStatus = current.getStatus();
+        int nextStatus = SourceState.SOURCE_NORMAL.getCode();
+        // Change the status from 30x to normal / disable / frozen
+        if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
+            if (Constants.RESULT_SUCCESS == result) {
+                if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
+                    nextStatus = SourceState.SOURCE_NORMAL.getCode();
+                } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
+                    nextStatus = SourceState.SOURCE_DISABLE.getCode();
+                } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
+                    nextStatus = SourceState.SOURCE_FROZEN.getCode();
+                }
+            } else if (Constants.RESULT_FAIL == result) {
+                nextStatus = SourceState.SOURCE_FAILED.getCode();
             }
-            // Other tasks with status 20x will change to 30x in next getTaskResult method
+
+            sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
+            LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
         }
     }
 
     /**
      * Get task result by the request
      */
-    private TaskResult getTaskResult(TaskRequest request) {
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    public TaskResult getTaskResult(TaskRequest request) {
+        if (request == null) {

Review comment:
       Add check if `request.getAgentIp()` is empty.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org