You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/20 11:09:16 UTC
[incubator-inlong] branch master updated: [INLONG-3252][Manager] Remove repeated-read transaction in the getting source method (#3253)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 86319d8 [INLONG-3252][Manager] Remove repeated-read transaction in the getting source method (#3253)
86319d8 is described below
commit 86319d8fbfca7da9e63e2733cc30052f23daa114
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sun Mar 20 19:09:10 2022 +0800
[INLONG-3252][Manager] Remove repeated-read transaction in the getting source method (#3253)
---
.../dao/mapper/StreamSourceEntityMapper.java | 7 +++--
.../resources/mappers/StreamSourceEntityMapper.xml | 18 ++++++-----
.../service/core/impl/AgentServiceImpl.java | 6 ++--
.../service/core/impl/InlongGroupServiceImpl.java | 3 --
.../service/core/impl/InlongStreamServiceImpl.java | 35 ++--------------------
.../source/AbstractStreamSourceOperation.java | 1 -
.../service/source/SourceSnapshotOperation.java | 9 ++++--
.../service/source/StreamSourceServiceImpl.java | 2 +-
8 files changed, 28 insertions(+), 53 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 9cc4144..b70998b 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.springframework.stereotype.Repository;
+import java.util.Date;
import java.util.List;
@Repository
@@ -89,7 +90,8 @@ public interface StreamSourceEntityMapper {
*
* @apiNote Should not change the modify_time
*/
- int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus);
+ int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus,
+ @Param("modifyTime") Date modifyTime);
/**
* Update the status to `nextStatus` by the given group id and stream id.
@@ -104,7 +106,8 @@ public interface StreamSourceEntityMapper {
*
* @apiNote Should not change the modify_time
*/
- int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @Param("uuid") String uuid);
+ int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @Param("uuid") String uuid,
+ @Param("modifyTime") Date modifyTime);
int updateSnapshot(StreamSourceEntity entity);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index b12f803..81d476a 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -434,15 +434,16 @@
<update id="updateStatus">
update stream_source
set previous_status = status,
- status = #{nextStatus, jdbcType=INTEGER},
- modify_time = modify_time
+ status = #{nextStatus, jdbcType=INTEGER}
+ <if test="modifyTime != null">
+ ,modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+ </if>
where id = #{id, jdbcType=INTEGER}
</update>
<update id="updateStatusByRelatedId">
update stream_source
set previous_status = status,
- status = #{nextStatus, jdbcType=INTEGER},
- modify_time = modify_time
+ status = #{nextStatus, jdbcType=INTEGER}
<where>
inlong_group_id = #{groupId, jdbcType=VARCHAR}
<if test="streamId != null">
@@ -453,15 +454,16 @@
<update id="updateIpAndUuid">
update stream_source
set agent_ip = #{agentIp,jdbcType=VARCHAR},
- uuid = #{uuid,jdbcType=VARCHAR},
- modify_time = modify_time
+ uuid = #{uuid,jdbcType=VARCHAR}
+ <if test="modifyTime != null">
+ ,modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+ </if>
where id = #{id, jdbcType=INTEGER}
</update>
<update id="updateSnapshot" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
update stream_source
set snapshot = #{snapshot,jdbcType=LONGVARCHAR},
- report_time = #{reportTime,jdbcType=TIMESTAMP},
- modify_time = modify_time
+ report_time = #{reportTime,jdbcType=TIMESTAMP}
where id = #{id,jdbcType=INTEGER}
</update>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index ed42ff4..0f4691d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -163,7 +163,7 @@ public class AgentServiceImpl implements AgentService {
nextStatus = SourceState.SOURCE_FAILED.getCode();
}
- sourceMapper.updateStatus(taskId, nextStatus);
+ sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
}
// Other tasks with status 20x will change to 30x in next getTaskResult method
@@ -197,7 +197,7 @@ public class AgentServiceImpl implements AgentService {
int op = status % MODULUS_100;
if (status / MODULUS_100 == UNISSUED_STATUS) {
int nextStatus = ISSUED_STATUS * MODULUS_100 + op;
- sourceMapper.updateStatus(id, nextStatus);
+ sourceMapper.updateStatus(id, nextStatus, entity.getModifyTime());
LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, id);
} else {
LOGGER.info("skip task status not in 20x, id={}", id);
@@ -212,7 +212,7 @@ public class AgentServiceImpl implements AgentService {
// Update agentIp and uuid for the added and active tasks
for (StreamSourceEntity entity : addList) {
- sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid);
+ sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, entity.getModifyTime());
LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}] ", agentIp, uuid, entity.getId());
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index cff6760..bbf7eeb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -230,9 +230,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
// Check whether the current status can be modified
this.checkGroupCanUpdate(entity, groupRequest, operator);
CommonBeanUtils.copyProperties(groupRequest, entity, true);
- if (GroupState.CONFIG_FAILED.getCode().equals(entity.getStatus())) {
- entity.setStatus(GroupState.TO_BE_SUBMIT.getCode());
- }
entity.setModifier(operator);
groupMapper.updateByIdentifierSelective(entity);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index ef6ba02..cc3d97f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -29,9 +29,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
-import org.apache.inlong.manager.common.pojo.source.SourceDbBasicInfo;
import org.apache.inlong.manager.common.pojo.source.SourceDbDetailInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceFileBasicInfo;
import org.apache.inlong.manager.common.pojo.source.SourceFileDetailInfo;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullPageUpdateRequest;
@@ -70,7 +68,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
-import java.util.Locale;
import java.util.stream.Collectors;
/**
@@ -493,41 +490,15 @@ public class InlongStreamServiceImpl implements InlongStreamService {
FullStreamResponse pageInfo = new FullStreamResponse();
pageInfo.setStreamInfo(streamInfo);
- // 3. Query the basic and detailed information of the data source
- String dataSourceType = streamInfo.getDataSourceType();
- if (StringUtils.isEmpty(dataSourceType)) {
- continue;
- }
- switch (dataSourceType.toUpperCase(Locale.ROOT)) {
- case Constant.DATA_SOURCE_FILE:
- SourceFileBasicInfo fileBasicInfo = sourceFileService.getBasicByIdentifier(groupId, streamId);
- pageInfo.setFileBasicInfo(fileBasicInfo);
- List<SourceFileDetailInfo> fileDetailInfoList = sourceFileService.listDetailByIdentifier(groupId,
- streamId);
- pageInfo.setFileDetailInfoList(fileDetailInfoList);
- break;
- case Constant.DATA_SOURCE_DB:
- SourceDbBasicInfo dbBasicInfo = sourceDbService.getBasicByIdentifier(groupId, streamId);
- pageInfo.setDbBasicInfo(dbBasicInfo);
- List<SourceDbDetailInfo> dbDetailInfoList = sourceDbService.listDetailByIdentifier(groupId,
- streamId);
- pageInfo.setDbDetailInfoList(dbDetailInfoList);
- break;
- case Constant.DATA_SOURCE_AUTO_PUSH:
- break;
- default:
- throw new BusinessException(ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORTED);
- }
-
- // 4. Query stream sources information
+ // 3. Query stream sources information
List<SourceResponse> sourceList = sourceService.listSource(groupId, streamId);
pageInfo.setSourceInfo(sourceList);
- // 5. Query various stream sinks and its extended information, field information
+ // 4. Query various stream sinks and its extended information, field information
List<SinkResponse> sinkList = sinkService.listSink(groupId, streamId);
pageInfo.setSinkInfo(sinkList);
- // 6. Add a single result to the paginated list
+ // 5. Add a single result to the paginated list
responseList.add(pageInfo);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index 27460db..9bc2f7a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -68,7 +68,6 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
protected abstract SourceResponse getResponse();
@Override
- @Transactional(isolation = Isolation.REPEATABLE_READ)
public SourceResponse getById(@NotNull Integer id) {
StreamSourceEntity entity = sourceMapper.selectById(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
index e9fadfe..213a0e1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
@@ -132,7 +132,8 @@ public class SourceSnapshotOperation implements AutoCloseable {
// Update the status from temporary to normal
Integer status = idStatusMap.get(id);
if (SourceState.TEMP_TO_NORMAL.contains(status)) {
- sourceMapper.updateStatus(id, SourceState.SOURCE_NORMAL.getCode());
+ StreamSourceEntity source = sourceMapper.selectByIdForUpdate(id);
+ sourceMapper.updateStatus(id, SourceState.SOURCE_NORMAL.getCode(), source.getModifyTime());
}
}
@@ -143,10 +144,12 @@ public class SourceSnapshotOperation implements AutoCloseable {
Integer cacheId = entry.getKey();
Integer cacheStatus = entry.getValue();
if (!currentTaskIdSet.contains(cacheId)) {
+ StreamSourceEntity source = sourceMapper.selectByIdForUpdate(cacheId);
if (Objects.equal(cacheStatus, SourceState.BEEN_ISSUED_DELETE.getCode())) {
- sourceMapper.updateStatus(cacheId, SourceState.SOURCE_DISABLE.getCode());
+ sourceMapper.updateStatus(cacheId, SourceState.SOURCE_DISABLE.getCode(),
+ source.getModifyTime());
} else if (Objects.equal(cacheStatus, SourceState.BEEN_ISSUED_FROZEN.getCode())) {
- sourceMapper.updateStatus(cacheId, SourceState.SOURCE_FROZEN.getCode());
+ sourceMapper.updateStatus(cacheId, SourceState.SOURCE_FROZEN.getCode(), source.getModifyTime());
}
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 3b5c092..b6ec0f1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -170,7 +170,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
@Transactional(rollbackFor = Throwable.class)
public boolean updateStatus(Integer id, Integer targetStatus, String operator) {
- sourceMapper.updateStatus(id, targetStatus);
+ sourceMapper.updateStatus(id, targetStatus, null);
LOGGER.info("success to update source status={} for id={} by {}", targetStatus, id, operator);
return true;
}