You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/18 12:37:57 UTC
[incubator-inlong] branch master updated: [INLONG-3228][Manager] Deadlock found when trying to get lock in getting stream source (#3229)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1510444 [INLONG-3228][Manager] Deadlock found when trying to get lock in getting stream source (#3229)
1510444 is described below
commit 15104443786fdcd43bf98483a816d155f285fe6c
Author: healchow <he...@gmail.com>
AuthorDate: Fri Mar 18 20:37:50 2022 +0800
[INLONG-3228][Manager] Deadlock found when trying to get lock in getting stream source (#3229)
---
.../inlong/manager/dao/mapper/StreamSourceEntityMapper.java | 4 ++--
.../src/main/resources/mappers/StreamSourceEntityMapper.xml | 3 +--
.../inlong/manager/service/core/impl/AgentServiceImpl.java | 8 ++++++--
.../manager/service/core/impl/SourceDbServiceImpl.java | 13 ++++---------
.../manager/service/core/impl/SourceFileServiceImpl.java | 13 ++++---------
.../service/source/AbstractStreamSourceOperation.java | 2 +-
.../manager/service/source/StreamSourceServiceImpl.java | 4 ++--
7 files changed, 20 insertions(+), 27 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index b18ef34..e1c91d1 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -46,8 +46,8 @@ public interface StreamSourceEntityMapper {
/**
* Query valid source list by the given group id, stream id and source name.
*/
- List<StreamSourceEntity> selectByRelatedIdForUpdate(@Param("groupId") String groupId,
- @Param("streamId") String streamId, @Param("sourceName") String sourceName);
+ List<StreamSourceEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
+ @Param("sourceName") String sourceName);
/**
* According to the group id, stream id and source type, query valid source entity list.
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 18d1492..20bffb1 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -252,7 +252,7 @@
</where>
</select>
- <select id="selectByRelatedIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
@@ -265,7 +265,6 @@
<if test="sourceName != null and sourceName != ''">
and source_name = #{sourceName, jdbcType=VARCHAR}
</if>
- for update
</where>
</select>
<select id="selectByRelatedIdAndTypeForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index f826977..ed42ff4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -112,7 +112,7 @@ public class AgentServiceImpl implements AgentService {
@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public TaskResult reportAndGetTask(TaskRequest request) {
- LOGGER.debug("begin to get agent task: {}", request);
+ LOGGER.info("begin to get agent task: {}", request);
if (request == null || request.getAgentIp() == null) {
LOGGER.warn("agent request was empty, just return");
return null;
@@ -164,6 +164,7 @@ public class AgentServiceImpl implements AgentService {
}
sourceMapper.updateStatus(taskId, nextStatus);
+ LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
}
// Other tasks with status 20x will change to 30x in next getTaskResult method
}
@@ -195,7 +196,9 @@ public class AgentServiceImpl implements AgentService {
int status = entity.getStatus();
int op = status % MODULUS_100;
if (status / MODULUS_100 == UNISSUED_STATUS) {
- sourceMapper.updateStatus(id, ISSUED_STATUS * MODULUS_100 + op);
+ int nextStatus = ISSUED_STATUS * MODULUS_100 + op;
+ sourceMapper.updateStatus(id, nextStatus);
+ LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, id);
} else {
LOGGER.info("skip task status not in 20x, id={}", id);
continue;
@@ -210,6 +213,7 @@ public class AgentServiceImpl implements AgentService {
// Update agentIp and uuid for the added and active tasks
for (StreamSourceEntity entity : addList) {
sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid);
+ LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}] ", agentIp, uuid, entity.getId());
}
return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceDbServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceDbServiceImpl.java
index 5fb74a4..6a3be48 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceDbServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceDbServiceImpl.java
@@ -96,7 +96,6 @@ public class SourceDbServiceImpl implements SourceDbService {
@Override
public SourceDbBasicInfo getBasicByIdentifier(String groupId, String streamId) {
- LOGGER.info("begin to get db data source basic by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
@@ -109,7 +108,7 @@ public class SourceDbServiceImpl implements SourceDbService {
}
BeanUtils.copyProperties(entity, basicInfo);
- LOGGER.info("success to get db data source basic");
+ LOGGER.debug("success to get db data source basic");
return basicInfo;
}
@@ -188,7 +187,6 @@ public class SourceDbServiceImpl implements SourceDbService {
@Override
public SourceDbDetailInfo getDetailById(Integer id) {
- LOGGER.info("begin to get db data source detail by id={}", id);
Preconditions.checkNotNull(id, "db data source detail's id is null");
SourceDbDetailEntity entity = dbDetailMapper.selectByPrimaryKey(id);
@@ -198,13 +196,12 @@ public class SourceDbServiceImpl implements SourceDbService {
}
SourceDbDetailInfo detailInfo = CommonBeanUtils.copyProperties(entity, SourceDbDetailInfo::new);
- LOGGER.info("success to get db data source detail");
+ LOGGER.debug("success to get db data source detail");
return detailInfo;
}
@Override
public List<SourceDbDetailInfo> listDetailByIdentifier(String groupId, String streamId) {
- LOGGER.info("begin to list db data source detail by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
List<SourceDbDetailEntity> entities = dbDetailMapper.selectByIdentifier(groupId, streamId);
@@ -215,14 +212,12 @@ public class SourceDbServiceImpl implements SourceDbService {
}
List<SourceDbDetailInfo> infoList = CommonBeanUtils.copyListProperties(entities, SourceDbDetailInfo::new);
- LOGGER.info("success to list db data source detail");
+ LOGGER.debug("success to list db data source detail");
return infoList;
}
@Override
public PageInfo<SourceDbDetailListVO> listByCondition(SourceDbDetailPageRequest request) {
- LOGGER.info("begin to list db data source detail page by {}", request);
-
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<SourceDbDetailEntity> entityPage = (Page<SourceDbDetailEntity>) dbDetailMapper.selectByCondition(request);
List<SourceDbDetailListVO> detailList = CommonBeanUtils
@@ -232,7 +227,7 @@ public class SourceDbServiceImpl implements SourceDbService {
PageInfo<SourceDbDetailListVO> page = new PageInfo<>(detailList);
page.setTotal(entityPage.getTotal());
- LOGGER.info("success to list db data source detail");
+ LOGGER.debug("success to list db data source detail");
return page;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
index 696d9a5..1647b64 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
@@ -96,7 +96,6 @@ public class SourceFileServiceImpl implements SourceFileService {
@Override
public SourceFileBasicInfo getBasicByIdentifier(String groupId, String streamId) {
- LOGGER.info("begin to get file data source basic by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
@@ -109,7 +108,7 @@ public class SourceFileServiceImpl implements SourceFileService {
}
CommonBeanUtils.copyProperties(entity, basicInfo);
- LOGGER.info("success to get file data source basic");
+ LOGGER.debug("success to get file data source basic");
return basicInfo;
}
@@ -210,7 +209,6 @@ public class SourceFileServiceImpl implements SourceFileService {
@Override
public SourceFileDetailInfo getDetailById(Integer id) {
- LOGGER.info("begin to get file data source detail by id={}", id);
Preconditions.checkNotNull(id, "file data source detail's id is null");
SourceFileDetailEntity entity = fileDetailMapper.selectByPrimaryKey(id);
@@ -220,13 +218,12 @@ public class SourceFileServiceImpl implements SourceFileService {
}
SourceFileDetailInfo detailInfo = CommonBeanUtils.copyProperties(entity, SourceFileDetailInfo::new);
- LOGGER.info("success to get file data source detail");
+ LOGGER.debug("success to get file data source detail");
return detailInfo;
}
@Override
public List<SourceFileDetailInfo> listDetailByIdentifier(String groupId, String streamId) {
- LOGGER.info("begin list file data source detail by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
List<SourceFileDetailEntity> entities = fileDetailMapper.selectByIdentifier(groupId, streamId);
@@ -237,14 +234,12 @@ public class SourceFileServiceImpl implements SourceFileService {
}
List<SourceFileDetailInfo> infoList = CommonBeanUtils.copyListProperties(entities, SourceFileDetailInfo::new);
- LOGGER.info("success to list file data source detail");
+ LOGGER.debug("success to list file data source detail");
return infoList;
}
@Override
public PageInfo<SourceFileDetailListVO> listByCondition(SourceFileDetailPageRequest request) {
- LOGGER.info("begin to list file data source detail page by {}", request);
-
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<SourceFileDetailEntity> page = (Page<SourceFileDetailEntity>) fileDetailMapper.selectByCondition(request);
List<SourceFileDetailListVO> detailList = CommonBeanUtils.copyListProperties(page, SourceFileDetailListVO::new);
@@ -253,7 +248,7 @@ public class SourceFileServiceImpl implements SourceFileService {
PageInfo<SourceFileDetailListVO> pageInfo = new PageInfo<>(detailList);
pageInfo.setTotal(page.getTotal());
- LOGGER.info("success to list file data source detail");
+ LOGGER.debug("success to list file data source detail");
return pageInfo;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index 67235b2..e094904 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -102,7 +102,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
String groupId = request.getInlongGroupId();
String streamId = request.getInlongStreamId();
String sourceName = request.getSourceName();
- List<StreamSourceEntity> existList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, sourceName);
+ List<StreamSourceEntity> existList = sourceMapper.selectByRelatedId(groupId, streamId, sourceName);
if (CollectionUtils.isNotEmpty(existList)) {
String err = "stream source already exists with groupId=%s, streamId=%s, sourceName=%s";
throw new BusinessException(String.format(err, groupId, streamId, sourceName));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index b93e9f9..3b5c092 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -109,7 +109,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
LOGGER.debug("begin to list source by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, null);
+ List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId, null);
if (CollectionUtils.isEmpty(entityList)) {
return Collections.emptyList();
}
@@ -253,7 +253,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
nextStatus = SourceState.SOURCE_DISABLE.getCode();
}
Date now = new Date();
- List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedIdForUpdate(groupId, streamId, null);
+ List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId, null);
if (CollectionUtils.isNotEmpty(entityList)) {
for (StreamSourceEntity entity : entityList) {
Integer id = entity.getId();