You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/21 11:32:13 UTC

[incubator-inlong] branch master updated: [INLONG-3264][Manager] Fix deadlock in stream source (#3265)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ba7e159  [INLONG-3264][Manager] Fix deadlock in stream source (#3265)
ba7e159 is described below

commit ba7e159d8c40f3dc4210f265814af72382175238
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Mon Mar 21 19:32:08 2022 +0800

    [INLONG-3264][Manager] Fix deadlock in stream source (#3265)
---
 .../inlong/manager/service/core/AgentService.java  |  12 +-
 .../service/core/impl/AgentServiceImpl.java        | 129 ++++++++++++---------
 .../service/source/StreamSourceServiceImpl.java    |  27 +++--
 .../manager-web/sql/apache_inlong_manager.sql      |   3 +-
 .../web/controller/openapi/AgentController.java    |   3 +-
 5 files changed, 104 insertions(+), 70 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
index 35130a6..f118187 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
@@ -43,12 +43,20 @@ public interface AgentService {
     Boolean reportSnapshot(TaskSnapshotRequest request);
 
     /**
-     * Agent report the task result, and pull task config to operate.
+     * Agent report the task result.
      *
      * @param request Request of the task result.
      * @return Task result.
      */
-    TaskResult reportAndGetTask(TaskRequest request);
+    void report(TaskRequest request);
+
+    /**
+     *  Pull task config to operate.
+     *
+     * @param request Request of the task result.
+     * @return Task result.
+     */
+    TaskResult getTaskResult(TaskRequest request);
 
     @Deprecated
     FileAgentTaskInfo getFileAgentTask(FileAgentCommandInfo info);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 0f4691d..c6c7c68 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.db.CommandEntity;
 import org.apache.inlong.common.pojo.agent.CmdConfig;
@@ -40,6 +41,7 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo.CommandInfoBean;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskConfig;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataSourceCmdConfigEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
@@ -59,6 +61,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.time.Instant;
@@ -110,84 +113,92 @@ public class AgentServiceImpl implements AgentService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
-    public TaskResult reportAndGetTask(TaskRequest request) {
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    public void report(TaskRequest request) {
         LOGGER.info("begin to get agent task: {}", request);
-        if (request == null || request.getAgentIp() == null) {
+        if (request == null) {
             LOGGER.warn("agent request was empty, just return");
-            return null;
+            return;
         }
-
-        this.updateTaskStatus(request);
-
-        return this.getTaskResult(request);
-    }
-
-    /**
-     * Update the task status by the request
-     */
-    private void updateTaskStatus(TaskRequest request) {
+        Preconditions.checkNotEmpty(request.getAgentIp(),
+                String.format("AgentIp should not be null in request=%s", request));
         if (CollectionUtils.isEmpty(request.getCommandInfo())) {
             LOGGER.warn("task result was empty, just return");
             return;
         }
-
         for (CommandEntity command : request.getCommandInfo()) {
-            Integer taskId = command.getTaskId();
-            StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
-            if (current == null) {
-                continue;
-            }
+            updateCommandEntity(command);
+            // Other tasks with status 20x will change to 30x in next getTaskResult method
+        }
+    }
 
-            LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
-            Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
-            if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
-                LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
-                continue;
-            }
+    public void updateCommandEntity(CommandEntity command) {
+        Integer taskId = command.getTaskId();
+        StreamSourceEntity current = sourceMapper.selectByIdForUpdate(taskId);
+        if (current == null) {
+            return;
+        }
 
-            int result = command.getCommandResult();
-            int previousStatus = current.getStatus();
-            int nextStatus = SourceState.SOURCE_NORMAL.getCode();
-            // Change the status from 30x to normal / disable / frozen
-            if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
-                if (Constants.RESULT_SUCCESS == result) {
-                    if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
-                        nextStatus = SourceState.SOURCE_NORMAL.getCode();
-                    } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
-                        nextStatus = SourceState.SOURCE_DISABLE.getCode();
-                    } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
-                        nextStatus = SourceState.SOURCE_FROZEN.getCode();
-                    }
-                } else if (Constants.RESULT_FAIL == result) {
-                    nextStatus = SourceState.SOURCE_FAILED.getCode();
-                }
+        LocalDateTime localDateTime = LocalDateTime.parse(command.getDeliveryTime(), TIME_FORMATTER);
+        Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
+        if (current.getModifyTime().getTime() - instant.toEpochMilli() > maxModifyTime) {
+            LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
+            return;
+        }
 
-                sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
-                LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
+        int result = command.getCommandResult();
+        int previousStatus = current.getStatus();
+        int nextStatus = SourceState.SOURCE_NORMAL.getCode();
+        // Change the status from 30x to normal / disable / frozen
+        if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
+            if (Constants.RESULT_SUCCESS == result) {
+                if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
+                    nextStatus = SourceState.SOURCE_NORMAL.getCode();
+                } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
+                    nextStatus = SourceState.SOURCE_DISABLE.getCode();
+                } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
+                    nextStatus = SourceState.SOURCE_FROZEN.getCode();
+                }
+            } else if (Constants.RESULT_FAIL == result) {
+                nextStatus = SourceState.SOURCE_FAILED.getCode();
             }
-            // Other tasks with status 20x will change to 30x in next getTaskResult method
+
+            sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
+            LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
         }
     }
 
     /**
      * Get task result by the request
      */
-    private TaskResult getTaskResult(TaskRequest request) {
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.READ_COMMITTED,
+            propagation = Propagation.REQUIRES_NEW)
+    public TaskResult getTaskResult(TaskRequest request) {
+        if (request == null) {
+            LOGGER.warn("agent request was empty, just return");
+            return null;
+        }
+        Preconditions.checkNotEmpty(request.getAgentIp(),
+                String.format("AgentIp should not be null in request=%s", request));
         // Query the tasks that needed to add or active - without agentIp and uuid
         List<Integer> addedStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
                 SourceState.TO_BE_ISSUED_ACTIVE.getCode());
-        List<StreamSourceEntity> addList = sourceMapper.selectByStatusForUpdate(addedStatusList);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByStatusForUpdate(addedStatusList);
 
-        // Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
-        List<Integer> statusList = Arrays.asList(SourceState.TO_BE_ISSUED_DELETE.getCode(),
-                SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
-                SourceState.TO_BE_ISSUED_FROZEN.getCode(), SourceState.TO_BE_ISSUED_CHECK.getCode(),
-                SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
         String agentIp = request.getAgentIp();
         String uuid = request.getUuid();
-        List<StreamSourceEntity> entityList = sourceMapper.selectByStatusAndIpForUpdate(statusList, agentIp, uuid);
-        entityList.addAll(addList);
+        if (StringUtils.isNotEmpty(agentIp)) {
+            // Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
+            List<Integer> statusList = Arrays.asList(SourceState.TO_BE_ISSUED_DELETE.getCode(),
+                    SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
+                    SourceState.TO_BE_ISSUED_FROZEN.getCode(), SourceState.TO_BE_ISSUED_CHECK.getCode(),
+                    SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
+            List<StreamSourceEntity> agentAddList = sourceMapper.selectByStatusAndIpForUpdate(statusList, agentIp,
+                    uuid);
+            entityList.addAll(agentAddList);
+        }
 
         List<DataConfig> dataConfigs = Lists.newArrayList();
         for (StreamSourceEntity entity : entityList) {
@@ -211,9 +222,13 @@ public class AgentServiceImpl implements AgentService {
         List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
 
         // Update agentIp and uuid for the added and active tasks
-        for (StreamSourceEntity entity : addList) {
-            sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, entity.getModifyTime());
-            LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}] ", agentIp, uuid, entity.getId());
+        for (StreamSourceEntity entity : entityList) {
+            if (StringUtils.isNotEmpty(agentIp)
+                    && StringUtils.isEmpty(entity.getAgentIp())) {
+                sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, entity.getModifyTime());
+                LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}] ", agentIp, uuid,
+                        entity.getId());
+            }
         }
 
         return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index b6ec0f1..3fa64ad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
@@ -68,7 +69,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     private CommonOperateService commonOperateService;
 
     @Override
-    @Transactional(rollbackFor = Throwable.class)
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
     public Integer save(SourceRequest request, String operator) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("begin to save source info=" + request);
@@ -149,7 +150,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class)
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+            isolation = Isolation.READ_COMMITTED)
     public boolean update(SourceRequest request, String operator) {
         LOGGER.debug("begin to update source info=" + request);
         this.checkParams(request);
@@ -168,7 +170,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class)
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+            isolation = Isolation.READ_COMMITTED)
     public boolean updateStatus(Integer id, Integer targetStatus, String operator) {
         sourceMapper.updateStatus(id, targetStatus, null);
         LOGGER.info("success to update source status={} for id={} by {}", targetStatus, id, operator);
@@ -176,7 +179,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class)
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+            isolation = Isolation.READ_COMMITTED)
     public boolean updateStatus(String groupId, String streamId, Integer targetStatus, String operator) {
         sourceMapper.updateStatusByRelatedId(groupId, streamId, targetStatus);
         LOGGER.info("success to update source status={} for groupId={}, streamId={} by {}",
@@ -185,7 +189,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class)
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+            isolation = Isolation.READ_COMMITTED)
     public boolean delete(Integer id, String sourceType, String operator) {
         LOGGER.info("begin to delete source by id={}, sourceType={}", id, sourceType);
         Preconditions.checkNotNull(id, Constant.ID_IS_EMPTY);
@@ -204,7 +209,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+            isolation = Isolation.READ_COMMITTED)
     public boolean restart(Integer id, String sourceType, String operator) {
         LOGGER.info("begin to restart source by id={}, sourceType={}", id, sourceType);
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
@@ -221,7 +227,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+            isolation = Isolation.READ_COMMITTED)
     public boolean stop(Integer id, String sourceType, String operator) {
         LOGGER.info("begin to stop source by id={}, sourceType={}", id, sourceType);
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
@@ -238,7 +245,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class)
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+            isolation = Isolation.READ_COMMITTED)
     public boolean logicDeleteAll(String groupId, String streamId, String operator) {
         LOGGER.info("begin to logic delete all source info by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
@@ -272,7 +280,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class)
+    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
+            isolation = Isolation.READ_COMMITTED)
     public boolean deleteAll(String groupId, String streamId, String operator) {
         LOGGER.info("begin to delete all source by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index f6fb2d5..fc2ee52 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -611,7 +611,8 @@ CREATE TABLE `stream_source`
     `create_time`      timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`)
+    UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
+    KEY `status` (`status`,`is_deleted`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
 
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index 3599da6..0744bfe 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -73,7 +73,8 @@ public class AgentController {
     @PostMapping("/reportAndGetTask")
     @ApiOperation(value = "Report source task snapshot")
     public Response<TaskResult> reportAndGetTask(@RequestBody TaskRequest request) {
-        return Response.success(agentService.reportAndGetTask(request));
+        agentService.report(request);
+        return Response.success(agentService.getTaskResult(request));
     }
 
     @Deprecated