You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/23 08:39:39 UTC
[incubator-inlong] branch master updated: [INLONG-3312][Manager] Optimize the delete operation for stream source (#3320)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7bf33f7 [INLONG-3312][Manager] Optimize the delete operation for stream source (#3320)
7bf33f7 is described below
commit 7bf33f791d40837acdc7424af57d801b7a5e1713
Author: healchow <he...@gmail.com>
AuthorDate: Wed Mar 23 16:39:35 2022 +0800
[INLONG-3312][Manager] Optimize the delete operation for stream source (#3320)
---
.../dao/mapper/StreamSourceEntityMapper.java | 33 ++----------
.../resources/mappers/StreamSourceEntityMapper.xml | 56 ++-----------------
.../service/core/impl/AgentServiceImpl.java | 23 ++++----
.../manager/service/sink/StreamSinkService.java | 18 +++----
.../service/sink/StreamSinkServiceImpl.java | 62 +++++++++-------------
.../source/AbstractStreamSourceOperation.java | 5 +-
.../service/source/SourceSnapshotOperation.java | 3 +-
.../service/source/StreamSourceService.java | 10 ----
.../service/source/StreamSourceServiceImpl.java | 40 ++++----------
9 files changed, 73 insertions(+), 177 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 0eee410..9603275 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -22,7 +22,6 @@ import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.springframework.stereotype.Repository;
-import java.util.Date;
import java.util.List;
@Repository
@@ -36,8 +35,6 @@ public interface StreamSourceEntityMapper {
/**
* Query un-deleted sources by the given agentIp.
- *
- * @apiNote Sources with is_deleted > 0 need to be filtered.
*/
List<StreamSourceEntity> selectByAgentIp(@Param("agentIp") String agentIp);
@@ -58,30 +55,14 @@ public interface StreamSourceEntityMapper {
@Param("sourceName") String sourceName);
/**
- * According to the group id, stream id and source type, query valid source entity list.
- */
- List<StreamSourceEntity> selectByRelatedIdForUpdate(@Param("groupId") String groupId,
- @Param("streamId") String streamId, @Param("sourceType") String sourceType);
-
- /**
- * Query the tasks that need to be added for update.
- */
- List<StreamSourceEntity> selectByStatusForUpdate(@Param("list") List<Integer> list);
-
-
- /**
- * Query the tasks that need to be added.
+ * Query the tasks by the given status list.
*/
List<StreamSourceEntity> selectByStatus(@Param("list") List<Integer> list);
/**
- * Query the sources with status 20x by the given agent IP and agent UUID for update.
- */
- List<StreamSourceEntity> selectByStatusAndIpForUpdate(@Param("statusList") List<Integer> statusList,
- @Param("agentIp") String agentIp, @Param("uuid") String uuid);
-
- /**
* Query the sources with status 20x by the given agent IP and agent UUID.
+ *
+ * @apiNote Sources with is_deleted > 0 need to be filtered.
*/
List<StreamSourceEntity> selectByStatusAndIp(@Param("statusList") List<Integer> statusList,
@Param("agentIp") String agentIp, @Param("uuid") String uuid);
@@ -97,11 +78,9 @@ public interface StreamSourceEntityMapper {
/**
* Update the status to `nextStatus` by the given id.
- *
- * @apiNote Should not change the modify_time
*/
int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus,
- @Param("modifyTime") Date modifyTime);
+ @Param("changeTime") Boolean changeModifyTime);
/**
* Update the status to `nextStatus` by the given group id and stream id.
@@ -113,11 +92,9 @@ public interface StreamSourceEntityMapper {
/**
* Update the agentIp and uuid.
- *
- * @apiNote Should not change the modify_time
*/
int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @Param("uuid") String uuid,
- @Param("modifyTime") Date modifyTime);
+ @Param("changeTime") Boolean changeModifyTime);
int updateSnapshot(StreamSourceEntity entity);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index be1c3c0..963e4bb 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -146,35 +146,6 @@
</if>
</where>
</select>
- <select id="selectByRelatedIdForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
- select
- <include refid="Base_Column_List"/>
- from stream_source
- <where>
- is_deleted = 0
- and inlong_group_id = #{groupId, jdbcType=VARCHAR}
- <if test="streamId != null and streamId != ''">
- and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
- </if>
- <if test="sourceType != null and sourceType != ''">
- and source_type = #{sourceType, jdbcType=VARCHAR}
- </if>
- for update
- </where>
- </select>
- <select id="selectByStatusForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
- select
- <include refid="Base_Column_List"/>
- from stream_source
- <where>
- is_deleted = 0
- and status in
- <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
- #{item}
- </foreach>
- limit 2 for update
- </where>
- </select>
<select id="selectByStatus" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
@@ -185,23 +156,7 @@
<foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
#{item}
</foreach>
- </where>
- </select>
- <select id="selectByStatusAndIpForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
- select
- <include refid="Base_Column_List"/>
- from stream_source
- <where>
- is_deleted = 0
- and status in
- <foreach item="item" index="index" collection="statusList" open="(" close=")" separator=",">
- #{item}
- </foreach>
- and agent_ip = #{agentIp, jdbcType=VARCHAR}
- <if test="uuid != null and uuid != ''">
- and uuid = #{uuid, jdbcType=VARCHAR}
- </if>
- for update
+ limit 2
</where>
</select>
<select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
@@ -209,7 +164,6 @@
<include refid="Base_Column_List"/>
from stream_source
<where>
- is_deleted = 0
and status in
<foreach item="item" index="index" collection="statusList" open="(" close=")" separator=",">
#{item}
@@ -333,8 +287,8 @@
update stream_source
set previous_status = status,
status = #{nextStatus, jdbcType=INTEGER}
- <if test="modifyTime != null">
- , modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+ <if test="changeTime == false">
+ , modify_time = modify_time
</if>
where id = #{id, jdbcType=INTEGER}
</update>
@@ -353,8 +307,8 @@
update stream_source
set agent_ip = #{agentIp,jdbcType=VARCHAR},
uuid = #{uuid,jdbcType=VARCHAR}
- <if test="modifyTime != null">
- , modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+ <if test="changeTime == false">
+ , modify_time = modify_time
</if>
where id = #{id, jdbcType=INTEGER}
</update>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 7f67bed..3ecde6c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -155,8 +155,8 @@ public class AgentServiceImpl implements AgentService {
nextStatus = SourceState.SOURCE_FAILED.getCode();
}
- sourceMapper.updateStatus(taskId, nextStatus, current.getModifyTime());
- LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, taskId);
+ sourceMapper.updateStatus(taskId, nextStatus, false);
+ LOGGER.info("update stream source status to [{}] for id [{}]", nextStatus, taskId);
}
}
@@ -172,9 +172,9 @@ public class AgentServiceImpl implements AgentService {
}
// Query the tasks that needed to add or active - without agentIp and uuid
- List<Integer> addedStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
+ List<Integer> needAddStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
SourceState.TO_BE_ISSUED_ACTIVE.getCode());
- List<StreamSourceEntity> entityList = sourceMapper.selectByStatus(addedStatusList);
+ List<StreamSourceEntity> entityList = sourceMapper.selectByStatus(needAddStatusList);
String agentIp = request.getAgentIp();
String uuid = request.getUuid();
@@ -183,8 +183,8 @@ public class AgentServiceImpl implements AgentService {
SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
SourceState.TO_BE_ISSUED_FROZEN.getCode(), SourceState.TO_BE_ISSUED_CHECK.getCode(),
SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
- List<StreamSourceEntity> addedList = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
- entityList.addAll(addedList);
+ List<StreamSourceEntity> needIssuedList = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
+ entityList.addAll(needIssuedList);
List<DataConfig> dataConfigs = Lists.newArrayList();
for (StreamSourceEntity entity : entityList) {
@@ -195,7 +195,7 @@ public class AgentServiceImpl implements AgentService {
int op = status % MODULUS_100;
if (status / MODULUS_100 == UNISSUED_STATUS) {
int nextStatus = ISSUED_STATUS * MODULUS_100 + op;
- sourceMapper.updateStatus(id, nextStatus, entity.getModifyTime());
+ sourceMapper.updateStatus(id, nextStatus, false);
LOGGER.info("update stream source status to [{}] for id [{}] ", nextStatus, id);
} else {
LOGGER.info("skip task status not in 20x, id={}", id);
@@ -211,7 +211,7 @@ public class AgentServiceImpl implements AgentService {
// Update agentIp and uuid for the added and active tasks
for (StreamSourceEntity entity : entityList) {
if (StringUtils.isEmpty(entity.getAgentIp())) {
- sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, entity.getModifyTime());
+ sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, false);
LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}]", agentIp, uuid, entity.getId());
}
}
@@ -239,7 +239,12 @@ public class AgentServiceImpl implements AgentService {
dataConfig.setInlongGroupId(groupId);
dataConfig.setInlongStreamId(streamId);
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
- dataConfig.setSyncSend(streamEntity.getSyncSend());
+ if (streamEntity != null) {
+ dataConfig.setSyncSend(streamEntity.getSyncSend());
+ } else {
+ dataConfig.setSyncSend(0);
+ LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", groupId, streamId);
+ }
return dataConfig;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index dc83bb2..b04784b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -95,6 +95,15 @@ public interface StreamSinkService {
Boolean update(SinkRequest sinkRequest, String operator);
/**
+ * Modify sink data status.
+ *
+ * @param id Sink id.
+ * @param status Target status.
+ * @param log Modify the log.
+ */
+ void updateStatus(int id, int status, String log);
+
+ /**
* Delete the stream sink by the given id and sink type.
*
* @param id The primary key of the sink.
@@ -105,15 +114,6 @@ public interface StreamSinkService {
Boolean delete(Integer id, String sinkType, String operator);
/**
- * Modify sink data status.
- *
- * @param id Sink id.
- * @param status Target status.
- * @param log Modify the log.
- */
- void updateStatus(int id, int status, String log);
-
- /**
* Logically delete stream sink with the given conditions.
*
* @param groupId InLong group id to which the data source belongs.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 7c9290d..f030959 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -95,9 +95,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
@Transactional(rollbackFor = Throwable.class)
public Integer save(SinkRequest request, String operator) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("begin to save sink info=" + request);
- }
+ LOGGER.info("begin to save sink info: {}", request);
this.checkParams(request);
// Check if it can be added
@@ -120,16 +118,15 @@ public class StreamSinkServiceImpl implements StreamSinkService {
executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
}
- LOGGER.info("success to save sink info");
+ LOGGER.info("success to save sink info: {}", request);
return id;
}
@Override
public SinkResponse get(Integer id, String sinkType) {
- LOGGER.debug("begin to get sink by id={}, sinkType={}", id, sinkType);
StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
SinkResponse sinkResponse = operation.getById(sinkType, id);
- LOGGER.debug("success to get sink info");
+ LOGGER.debug("success to get sink by id={}, sinkType={}", id, sinkType);
return sinkResponse;
}
@@ -142,9 +139,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public List<SinkResponse> listSink(String groupId, String streamId) {
- LOGGER.debug("begin to list sink by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-
List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
if (CollectionUtils.isEmpty(entityList)) {
return Collections.emptyList();
@@ -152,30 +147,25 @@ public class StreamSinkServiceImpl implements StreamSinkService {
List<SinkResponse> responseList = new ArrayList<>();
entityList.forEach(entity -> responseList.add(this.get(entity.getId(), entity.getSinkType())));
- LOGGER.info("success to list sink");
+ LOGGER.debug("success to list sink by groupId={}, streamId={}", groupId, streamId);
return responseList;
}
@Override
public List<SinkBriefResponse> listBrief(String groupId, String streamId) {
- LOGGER.debug("begin to list sink summary by groupId=" + groupId + ", streamId=" + streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
// Query all sink information and encapsulate it in the result set
List<SinkBriefResponse> summaryList = sinkMapper.selectSummary(groupId, streamId);
- LOGGER.debug("success to list sink summary");
+ LOGGER.debug("success to list sink summary by groupId=" + groupId + ", streamId=" + streamId);
return summaryList;
}
@Override
public PageInfo<? extends SinkListResponse> listByCondition(SinkPageRequest request) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("begin to list sink page by " + request);
- }
Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
-
PageHelper.startPage(request.getPageNum(), request.getPageSize());
List<StreamSinkEntity> entityPage = sinkMapper.selectByCondition(request);
Map<SinkType, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
@@ -193,14 +183,14 @@ public class StreamSinkServiceImpl implements StreamSinkService {
// Encapsulate the paging query results into the PageInfo object to obtain related paging information
PageInfo<? extends SinkListResponse> pageInfo = PageInfo.of(sinkListResponses);
- LOGGER.debug("success to list sink page");
+ LOGGER.debug("success to list sink page, result size {}", pageInfo.getSize());
return pageInfo;
}
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean update(SinkRequest request, String operator) {
- LOGGER.info("begin to update sink info={}", request);
+ LOGGER.info("begin to update sink info: {}", request);
this.checkParams(request);
Preconditions.checkNotNull(request.getId(), Constant.ID_IS_EMPTY);
@@ -219,10 +209,21 @@ public class StreamSinkServiceImpl implements StreamSinkService {
if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
}
- LOGGER.info("success to update sink info");
+ LOGGER.info("success to update sink info: {}", request);
return true;
}
+ @Override
+ public void updateStatus(int id, int status, String log) {
+ StreamSinkEntity entity = new StreamSinkEntity();
+ entity.setId(id);
+ entity.setStatus(status);
+ entity.setOperateLog(log);
+ sinkMapper.updateStatus(entity);
+
+ LOGGER.info("success to update sink status={} for id={} with log: {}", status, id, log);
+ }
+
@Transactional(rollbackFor = Throwable.class)
@Override
public Boolean delete(Integer id, String sinkType, String operator) {
@@ -240,23 +241,13 @@ public class StreamSinkServiceImpl implements StreamSinkService {
entity.setModifier(operator);
entity.setModifyTime(new Date());
sinkMapper.updateByPrimaryKeySelective(entity);
-
sinkFieldMapper.logicDeleteAll(id);
- LOGGER.info("success to delete sink info");
+ LOGGER.info("success to delete sink info: {}", entity);
return true;
}
@Override
- public void updateStatus(int id, int status, String log) {
- StreamSinkEntity entity = new StreamSinkEntity();
- entity.setId(id);
- entity.setStatus(status);
- entity.setOperateLog(log);
- sinkMapper.updateStatus(entity);
- }
-
- @Override
@Transactional(rollbackFor = Throwable.class)
public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
LOGGER.info("begin to logic delete all sink info by groupId={}, streamId={}", groupId, streamId);
@@ -322,21 +313,18 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public List<String> getSinkTypeList(String groupId, String streamId) {
- LOGGER.debug("begin to get sink type list by groupId={}, streamId={}", groupId, streamId);
if (StringUtils.isEmpty(streamId)) {
return Collections.emptyList();
}
List<String> resultList = sinkMapper.selectSinkType(groupId, streamId);
- LOGGER.debug("success to get sink type list, result sinkType={}", resultList);
+ LOGGER.debug("success to get sink type by groupId={}, streamId={}, result={}", groupId, streamId, resultList);
return resultList;
}
@Override
public Boolean updateAfterApprove(List<SinkApproveDTO> approveList, String operator) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("begin to update sink after approve={}", approveList);
- }
+ LOGGER.info("begin to update sink after approve: {}", approveList);
if (CollectionUtils.isEmpty(approveList)) {
return true;
}
@@ -358,7 +346,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
sinkMapper.updateByPrimaryKeySelective(entity);
}
- LOGGER.info("success to update sink after approve");
+ LOGGER.info("success to update sink after approve: {}", approveList);
return true;
}
@@ -392,13 +380,13 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public void run() {
String groupId = inlongGroupEntity.getInlongGroupId();
- LOGGER.info("begin start inlong stream workflow, groupId={}, streamId={}", groupId, streamId);
+ LOGGER.info("begin start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
InlongGroupInfo groupInfo = CommonBeanUtils.copyProperties(inlongGroupEntity, InlongGroupInfo::new);
GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, streamId);
workflowService.start(ProcessName.CREATE_STREAM_RESOURCE, operator, form);
- LOGGER.info("success start inlong stream workflow, groupId={}, streamId={}", groupId, streamId);
+ LOGGER.info("success start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
}
/**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index 918ce16..c3eafd9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -17,7 +17,6 @@
package org.apache.inlong.manager.service.source;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -41,7 +40,9 @@ import javax.validation.constraints.NotNull;
import java.util.Date;
import java.util.List;
-@Slf4j
+/**
+ * Default operation of stream source.
+ */
public abstract class AbstractStreamSourceOperation implements StreamSourceOperation {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamSourceOperation.class);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
index 713ebdd..ae5e6ee 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
@@ -142,8 +142,7 @@ public class SourceSnapshotOperation implements AutoCloseable {
Integer status = idStatusMap.get(id);
if (SourceState.TEMP_TO_NORMAL.contains(status)) {
isInvalid = true;
- StreamSourceEntity source = sourceMapper.selectByIdForUpdate(id);
- sourceMapper.updateStatus(id, SourceState.SOURCE_NORMAL.getCode(), source.getModifyTime());
+ sourceMapper.updateStatus(id, SourceState.SOURCE_NORMAL.getCode(), false);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 5f876b3..2cd3de8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -84,16 +84,6 @@ public interface StreamSourceService {
boolean update(SourceRequest sourceRequest, String operator);
/**
- * Update source status.
- *
- * @param id The source id.
- * @param targetStatus The target status.
- * @param operator The operator name.
- * @return whether succeed
- */
- boolean updateStatus(Integer id, Integer targetStatus, String operator);
-
- /**
* Update source status by the given groupId and streamId
*
* @param groupId The belongs group id.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 3de257a..40bf56b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -71,9 +71,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
public Integer save(SourceRequest request, String operator) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("begin to save source info=" + request);
- }
+ LOGGER.info("begin to save source info: {}", request);
this.checkParams(request);
// Check if it can be added
@@ -85,16 +83,15 @@ public class StreamSourceServiceImpl implements StreamSourceService {
StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
int id = operation.saveOpt(request, groupEntity.getStatus(), operator);
- LOGGER.info("success to save source info");
+ LOGGER.info("success to save source info: {}", request);
return id;
}
@Override
public SourceResponse get(Integer id, String sourceType) {
- LOGGER.debug("begin to get source by id={}, sourceType={}", id, sourceType);
StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
SourceResponse sourceResponse = operation.getById(id);
- LOGGER.debug("success to get source info");
+ LOGGER.debug("success to get source by id={}", id);
return sourceResponse;
}
@@ -107,27 +104,20 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
public List<SourceResponse> listSource(String groupId, String streamId) {
- LOGGER.debug("begin to list source by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
-
List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId, null);
if (CollectionUtils.isEmpty(entityList)) {
return Collections.emptyList();
}
List<SourceResponse> responseList = new ArrayList<>();
entityList.forEach(entity -> responseList.add(this.get(entity.getId(), entity.getSourceType())));
-
- LOGGER.info("success to list source");
+ LOGGER.debug("success to list source by groupId={}, streamId={}", groupId, streamId);
return responseList;
}
@Override
public PageInfo<? extends SourceListResponse> listByCondition(SourcePageRequest request) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("begin to list source page by " + request);
- }
Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
-
PageHelper.startPage(request.getPageNum(), request.getPageSize());
List<StreamSourceEntity> entityPage = sourceMapper.selectByCondition(request);
@@ -145,7 +135,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
sourceListResponses.addAll(pageInfo.getList());
}
PageInfo<? extends SourceListResponse> pageInfo = PageInfo.of(sourceListResponses);
- LOGGER.debug("success to list source page");
+
+ LOGGER.debug("success to list source page, result size {}", pageInfo.getSize());
return pageInfo;
}
@@ -153,7 +144,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
isolation = Isolation.READ_COMMITTED)
public boolean update(SourceRequest request, String operator) {
- LOGGER.debug("begin to update source info=" + request);
+ LOGGER.info("begin to update source info: {}", request);
this.checkParams(request);
Preconditions.checkNotNull(request.getId(), Constant.ID_IS_EMPTY);
@@ -165,16 +156,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
operation.updateOpt(request, groupEntity.getStatus(), operator);
- LOGGER.info("success to update source info");
- return true;
- }
-
- @Override
- @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW,
- isolation = Isolation.READ_COMMITTED)
- public boolean updateStatus(Integer id, Integer targetStatus, String operator) {
- sourceMapper.updateStatus(id, targetStatus, null);
- LOGGER.info("success to update source status={} for id={} by {}", targetStatus, id, operator);
+ LOGGER.info("success to update source info: {}", request);
return true;
}
@@ -204,7 +186,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
CommonBeanUtils.copyProperties(entity, sourceRequest, true);
operation.deleteOpt(sourceRequest, operator);
- LOGGER.info("success to delete source info:{}", entity);
+ LOGGER.info("success to delete source info: {}", entity);
return true;
}
@@ -222,7 +204,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
CommonBeanUtils.copyProperties(entity, sourceRequest, true);
operation.restartOpt(sourceRequest, operator);
- LOGGER.info("success to restart source info:{}", entity);
+ LOGGER.info("success to restart source info: {}", entity);
return true;
}
@@ -240,7 +222,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
CommonBeanUtils.copyProperties(entity, sourceRequest, true);
operation.stopOpt(sourceRequest, operator);
- LOGGER.info("success to stop source info:{}", entity);
+ LOGGER.info("success to stop source info: {}", entity);
return true;
}