You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/07 10:26:32 UTC
[incubator-inlong] branch master updated: [INLONG-2974][Manager] Support agent to pull tasks without ip and uuid (#2976)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dd6915e [INLONG-2974][Manager] Support agent to pull tasks without ip and uuid (#2976)
dd6915e is described below
commit dd6915e7c72407f0d580a90cd01c50e0ea39f6c9
Author: healchow <he...@gmail.com>
AuthorDate: Mon Mar 7 18:26:29 2022 +0800
[INLONG-2974][Manager] Support agent to pull tasks without ip and uuid (#2976)
* [INLONG-2974][Manager] Support agent to pull tasks without ip and uuid
* [INLONG-2974][Manager] Optimize query parameters
* [INLONG-2974][Manager] Change the isolation level to REPEATABLE_READ
---
.../dao/mapper/StreamSourceEntityMapper.java | 15 ++++-
.../resources/mappers/StreamSourceEntityMapper.xml | 27 +++++++-
.../service/core/impl/AgentServiceImpl.java | 72 +++++++++++++++-------
3 files changed, 89 insertions(+), 25 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index f1a9797..0bfb691 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -71,9 +71,15 @@ public interface StreamSourceEntityMapper {
@Param("sourceType") String sourceType);
/**
+ * Query the tasks that need to be added.
+ */
+ List<StreamSourceEntity> selectByStatusForUpdate(@Param("list") List<Integer> list);
+
+ /**
* Query the sources with status 20x by the given agent IP and agent UUID.
*/
- List<StreamSourceEntity> selectByIpAndUuid(@Param("agentIp") String agentIp, @Param("uuid") String uuid);
+ List<StreamSourceEntity> selectByStatusAndIp(@Param("list") List<Integer> list,
+ @Param("agentIp") String agentIp, @Param("uuid") String uuid);
/**
* Get the distinct source type from the given groupId and streamId
@@ -106,6 +112,13 @@ public interface StreamSourceEntityMapper {
int updateStatusByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
@Param("nextStatus") Integer nextStatus);
+ /**
+ * Update the agentIp and uuid.
+ *
+ * @apiNote Should not change the modify_time
+ */
+ int updateIpAndUuid(@Param("id") Integer id, @Param("agentIp") String agentIp, @Param("uuid") String uuid);
+
int updateSnapshot(StreamSourceEntity entity);
int deleteByPrimaryKey(Integer id);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index c8909f7..595b69a 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -277,13 +277,29 @@
</if>
</where>
</select>
- <select id="selectByIpAndUuid" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ <select id="selectByStatusForUpdate" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
select
<include refid="Base_Column_List"/>
from stream_source
<where>
is_deleted = 0
- and status >= 200 and status < 300
+ and status in
+ <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ limit 2 for update
+ </where>
+ </select>
+ <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source
+ <where>
+ is_deleted = 0
+ and status in
+ <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
and agent_ip = #{agentIp, jdbcType=VARCHAR}
<if test="uuid != null and uuid != ''">
and uuid = #{uuid, jdbcType=VARCHAR}
@@ -418,6 +434,13 @@
</if>
</where>
</update>
+ <update id="updateIpAndUuid">
+ update stream_source
+ set agent_ip = #{agentIp,jdbcType=VARCHAR},
+ uuid = #{uuid,jdbcType=VARCHAR},
+ modify_time = modify_time
+ where id = #{id, jdbcType=INTEGER}
+ </update>
<update id="updateSnapshot" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
update stream_source
set snapshot = #{snapshot,jdbcType=LONGVARCHAR},
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index a95e691..e882af5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -60,12 +60,15 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -170,12 +173,24 @@ public class AgentServiceImpl implements AgentService {
/**
* Get task result by the request
*/
- private TaskResult getTaskResult(TaskRequest request) {
- // Query all tasks with status in 20x
+ @Transactional(isolation = Isolation.REPEATABLE_READ)
+ TaskResult getTaskResult(TaskRequest request) {
+ // Query the tasks that needed to add or active - without agentIp and uuid
+ List<Integer> addedStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(),
+ SourceState.TO_BE_ISSUED_ACTIVE.getCode());
+ List<StreamSourceEntity> addList = sourceMapper.selectByStatusForUpdate(addedStatusList);
+
+ // Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
+ List<Integer> statusList = Arrays.asList(SourceState.TO_BE_ISSUED_DELETE.getCode(),
+ SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
+ SourceState.TO_BE_ISSUED_FROZEN.getCode(), SourceState.TO_BE_ISSUED_CHECK.getCode(),
+ SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
String agentIp = request.getAgentIp();
String uuid = request.getUuid();
+ List<StreamSourceEntity> entityList = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
+ entityList.addAll(addList);
+
List<DataConfig> dataConfigs = Lists.newArrayList();
- List<StreamSourceEntity> entityList = sourceMapper.selectByIpAndUuid(agentIp, uuid);
for (StreamSourceEntity entity : entityList) {
// Change 20x to 30x
int id = entity.getId();
@@ -188,33 +203,46 @@ public class AgentServiceImpl implements AgentService {
continue;
}
- DataConfig dataConfig = new DataConfig();
- dataConfig.setIp(entity.getAgentIp());
- dataConfig.setUuid(entity.getUuid());
- dataConfig.setOp(String.valueOf(op));
- dataConfig.setTaskId(entity.getId());
- dataConfig.setTaskType(getTaskType(entity));
- dataConfig.setTaskName(entity.getSourceName());
- dataConfig.setSnapshot(entity.getSnapshot());
- dataConfig.setExtParams(entity.getExtParams());
- LocalDateTime dateTime = LocalDateTime.ofInstant(entity.getModifyTime().toInstant(),
- ZoneId.systemDefault());
- dataConfig.setDeliveryTime(dateTime.format(TIME_FORMATTER));
-
- String groupId = entity.getInlongGroupId();
- String streamId = entity.getInlongStreamId();
- dataConfig.setInlongGroupId(groupId);
- dataConfig.setInlongStreamId(streamId);
- InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
- dataConfig.setSyncSend(streamEntity.getSyncSend());
+ DataConfig dataConfig = getDataConfig(entity, op);
dataConfigs.add(dataConfig);
}
// Query pending special commands
List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
+ // Update agentIp and uuid for the added and active tasks
+ for (StreamSourceEntity entity : addList) {
+ sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid);
+ }
+
return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
}
+ /**
+ * Get the DataConfig from the stream source entity
+ */
+ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
+ DataConfig dataConfig = new DataConfig();
+ dataConfig.setIp(entity.getAgentIp());
+ dataConfig.setUuid(entity.getUuid());
+ dataConfig.setOp(String.valueOf(op));
+ dataConfig.setTaskId(entity.getId());
+ dataConfig.setTaskType(getTaskType(entity));
+ dataConfig.setTaskName(entity.getSourceName());
+ dataConfig.setSnapshot(entity.getSnapshot());
+ dataConfig.setExtParams(entity.getExtParams());
+ LocalDateTime dateTime = LocalDateTime.ofInstant(entity.getModifyTime().toInstant(),
+ ZoneId.systemDefault());
+ dataConfig.setDeliveryTime(dateTime.format(TIME_FORMATTER));
+
+ String groupId = entity.getInlongGroupId();
+ String streamId = entity.getInlongStreamId();
+ dataConfig.setInlongGroupId(groupId);
+ dataConfig.setInlongStreamId(streamId);
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+ dataConfig.setSyncSend(streamEntity.getSyncSend());
+ return dataConfig;
+ }
+
private int getTaskType(StreamSourceEntity sourceEntity) {
SourceType sourceType = SourceType.forType(sourceEntity.getSourceType());
if (sourceType != SourceType.BINLOG) {