You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/18 12:37:57 UTC

[incubator-inlong] branch master updated: [INLONG-3228][Manager] Deadlock found when trying to get lock in getting stream source (#3229)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1510444  [INLONG-3228][Manager] Deadlock found when trying to get lock in getting stream source (#3229)
1510444 is described below

commit 15104443786fdcd43bf98483a816d155f285fe6c
Author: healchow <he...@gmail.com>
AuthorDate: Fri Mar 18 20:37:50 2022 +0800

    [INLONG-3228][Manager] Deadlock found when trying to get lock in getting stream source (#3229)
---
 .../inlong/manager/dao/mapper/StreamSourceEntityMapper.java |  4 ++--
 .../src/main/resources/mappers/StreamSourceEntityMapper.xml |  3 +--
 .../inlong/manager/service/core/impl/AgentServiceImpl.java  |  8 ++++++--
 .../manager/service/core/impl/SourceDbServiceImpl.java      | 13 ++++---------
 .../manager/service/core/impl/SourceFileServiceImpl.java    | 13 ++++---------
 .../service/source/AbstractStreamSourceOperation.java       |  2 +-
 .../manager/service/source/StreamSourceServiceImpl.java     |  4 ++--
 7 files changed, 20 insertions(+), 27 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index b18ef34..e1c91d1 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -46,8 +46,8 @@ public interface StreamSourceEntityMapper {
     /**
      * Query valid source list by the given group id, stream id and source name.
      */
-    List<StreamSourceEntity> selectByRelatedIdForUpdate(@Param("groupId") String groupId,
-            @Param("streamId") String streamId, @Param("sourceName") String sourceName);
+    List<StreamSourceEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
+            @Param("sourceName") String sourceName);
 
     /**
      * According to the group id, stream id and source type, query valid source entity list.
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 18d1492..20bffb1 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -252,7 +252,7 @@
         </where>
     </select>
 
-    <select id="selectByRelatedIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+    <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
@@ -265,7 +265,6 @@
             <if test="sourceName != null and sourceName != ''">
                 and source_name = #{sourceName, jdbcType=VARCHAR}
             </if>
-            for update
         </where>
     </select>
     <select id="selectByRelatedIdAndTypeForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index f826977..ed42ff4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -112,7 +112,7 @@ public class AgentServiceImpl implements AgentService {
     @Override
     @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
     public TaskResult reportAndGetTask(TaskRequest request) {
-        LOGGER.debug("begin to get agent task: {}", request);
+        LOGGER.info("begin to get agent task: {}", request);
         if (request == null || request.getAgentIp() == null) {
             LOGGER.warn("agent request was empty, just return");
             return null;
@@ -164,6 +164,7 @@ public class AgentServiceImpl implements AgentService {
                 }
 
                 sourceMapper.updateStatus(taskId, nextStatus);
+                LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
             }
             // Other tasks with status 20x will change to 30x in next getTaskResult method
         }
@@ -195,7 +196,9 @@ public class AgentServiceImpl implements AgentService {
             int status = entity.getStatus();
             int op = status % MODULUS_100;
             if (status / MODULUS_100 == UNISSUED_STATUS) {
-                sourceMapper.updateStatus(id, ISSUED_STATUS * MODULUS_100 + op);
+                int nextStatus = ISSUED_STATUS * MODULUS_100 + op;
+                sourceMapper.updateStatus(id, nextStatus);
+                LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, id);
             } else {
                 LOGGER.info("skip task status not in 20x, id={}", id);
                 continue;
@@ -210,6 +213,7 @@ public class AgentServiceImpl implements AgentService {
         // Update agentIp and uuid for the added and active tasks
         for (StreamSourceEntity entity : addList) {
             sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid);
+            LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}] ", agentIp, uuid, entity.getId());
         }
 
         return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceDbServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceDbServiceImpl.java
index 5fb74a4..6a3be48 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceDbServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceDbServiceImpl.java
@@ -96,7 +96,6 @@ public class SourceDbServiceImpl implements SourceDbService {
 
     @Override
     public SourceDbBasicInfo getBasicByIdentifier(String groupId, String streamId) {
-        LOGGER.info("begin to get db data source basic by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
         Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
 
@@ -109,7 +108,7 @@ public class SourceDbServiceImpl implements SourceDbService {
         }
         BeanUtils.copyProperties(entity, basicInfo);
 
-        LOGGER.info("success to get db data source basic");
+        LOGGER.debug("success to get db data source basic");
         return basicInfo;
     }
 
@@ -188,7 +187,6 @@ public class SourceDbServiceImpl implements SourceDbService {
 
     @Override
     public SourceDbDetailInfo getDetailById(Integer id) {
-        LOGGER.info("begin to get db data source detail by id={}", id);
         Preconditions.checkNotNull(id, "db data source detail's id is null");
 
         SourceDbDetailEntity entity = dbDetailMapper.selectByPrimaryKey(id);
@@ -198,13 +196,12 @@ public class SourceDbServiceImpl implements SourceDbService {
         }
         SourceDbDetailInfo detailInfo = CommonBeanUtils.copyProperties(entity, SourceDbDetailInfo::new);
 
-        LOGGER.info("success to get db data source detail");
+        LOGGER.debug("success to get db data source detail");
         return detailInfo;
     }
 
     @Override
     public List<SourceDbDetailInfo> listDetailByIdentifier(String groupId, String streamId) {
-        LOGGER.info("begin to list db data source detail by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
 
         List<SourceDbDetailEntity> entities = dbDetailMapper.selectByIdentifier(groupId, streamId);
@@ -215,14 +212,12 @@ public class SourceDbServiceImpl implements SourceDbService {
         }
         List<SourceDbDetailInfo> infoList = CommonBeanUtils.copyListProperties(entities, SourceDbDetailInfo::new);
 
-        LOGGER.info("success to list db data source detail");
+        LOGGER.debug("success to list db data source detail");
         return infoList;
     }
 
     @Override
     public PageInfo<SourceDbDetailListVO> listByCondition(SourceDbDetailPageRequest request) {
-        LOGGER.info("begin to list db data source detail page by {}", request);
-
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         Page<SourceDbDetailEntity> entityPage = (Page<SourceDbDetailEntity>) dbDetailMapper.selectByCondition(request);
         List<SourceDbDetailListVO> detailList = CommonBeanUtils
@@ -232,7 +227,7 @@ public class SourceDbServiceImpl implements SourceDbService {
         PageInfo<SourceDbDetailListVO> page = new PageInfo<>(detailList);
         page.setTotal(entityPage.getTotal());
 
-        LOGGER.info("success to list db data source detail");
+        LOGGER.debug("success to list db data source detail");
         return page;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
index 696d9a5..1647b64 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
@@ -96,7 +96,6 @@ public class SourceFileServiceImpl implements SourceFileService {
 
     @Override
     public SourceFileBasicInfo getBasicByIdentifier(String groupId, String streamId) {
-        LOGGER.info("begin to get file data source basic by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
         Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
 
@@ -109,7 +108,7 @@ public class SourceFileServiceImpl implements SourceFileService {
         }
         CommonBeanUtils.copyProperties(entity, basicInfo);
 
-        LOGGER.info("success to get file data source basic");
+        LOGGER.debug("success to get file data source basic");
         return basicInfo;
     }
 
@@ -210,7 +209,6 @@ public class SourceFileServiceImpl implements SourceFileService {
 
     @Override
     public SourceFileDetailInfo getDetailById(Integer id) {
-        LOGGER.info("begin to get file data source detail by id={}", id);
         Preconditions.checkNotNull(id, "file data source detail's id is null");
 
         SourceFileDetailEntity entity = fileDetailMapper.selectByPrimaryKey(id);
@@ -220,13 +218,12 @@ public class SourceFileServiceImpl implements SourceFileService {
         }
         SourceFileDetailInfo detailInfo = CommonBeanUtils.copyProperties(entity, SourceFileDetailInfo::new);
 
-        LOGGER.info("success to get file data source detail");
+        LOGGER.debug("success to get file data source detail");
         return detailInfo;
     }
 
     @Override
     public List<SourceFileDetailInfo> listDetailByIdentifier(String groupId, String streamId) {
-        LOGGER.info("begin list file data source detail by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
 
         List<SourceFileDetailEntity> entities = fileDetailMapper.selectByIdentifier(groupId, streamId);
@@ -237,14 +234,12 @@ public class SourceFileServiceImpl implements SourceFileService {
         }
 
         List<SourceFileDetailInfo> infoList = CommonBeanUtils.copyListProperties(entities, SourceFileDetailInfo::new);
-        LOGGER.info("success to list file data source detail");
+        LOGGER.debug("success to list file data source detail");
         return infoList;
     }
 
     @Override
     public PageInfo<SourceFileDetailListVO> listByCondition(SourceFileDetailPageRequest request) {
-        LOGGER.info("begin to list file data source detail page by {}", request);
-
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         Page<SourceFileDetailEntity> page = (Page<SourceFileDetailEntity>) fileDetailMapper.selectByCondition(request);
         List<SourceFileDetailListVO> detailList = CommonBeanUtils.copyListProperties(page, SourceFileDetailListVO::new);
@@ -253,7 +248,7 @@ public class SourceFileServiceImpl implements SourceFileService {
         PageInfo<SourceFileDetailListVO> pageInfo = new PageInfo<>(detailList);
         pageInfo.setTotal(page.getTotal());
 
-        LOGGER.info("success to list file data source detail");
+        LOGGER.debug("success to list file data source detail");
         return pageInfo;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index 67235b2..e094904 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -102,7 +102,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
         String groupId = request.getInlongGroupId();
         String streamId = request.getInlongStreamId();
         String sourceName = request.getSourceName();
-        List<StreamSourceEntity> existList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, sourceName);
+        List<StreamSourceEntity> existList = sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
         if (CollectionUtils.isNotEmpty(existList)) {
             String err = "stream source already exists with groupId=%s, streamId=%s, sourceName=%s";
             throw new BusinessException(String.format(err, groupId, streamId, sourceName));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index b93e9f9..3b5c092 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -109,7 +109,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         LOGGER.debug("begin to list source by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
 
-        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, null);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId, null);
         if (CollectionUtils.isEmpty(entityList)) {
             return Collections.emptyList();
         }
@@ -253,7 +253,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
             nextStatus = SourceState.SOURCE_DISABLE.getCode();
         }
         Date now = new Date();
-        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, null);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId, null);
         if (CollectionUtils.isNotEmpty(entityList)) {
             for (StreamSourceEntity entity : entityList) {
                 Integer id = entity.getId();