You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/17 10:56:52 UTC
[inlong] 03/04: [INLONG-7619][Manager] Support update and retry MySQL sources after updating MySQLDataNode (#7621)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit ee9aaf10de9a3beeea897f3d9a2b80d92689b743
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Thu Mar 16 18:43:17 2023 +0800
[INLONG-7619][Manager] Support update and retry MySQL sources after updating MySQLDataNode (#7621)
---
.../inlong/manager/common/enums/GroupStatus.java | 8 +++++
.../dao/mapper/StreamSourceEntityMapper.java | 16 +++++++++
.../resources/mappers/StreamSourceEntityMapper.xml | 42 +++++++++++++++++++++-
.../service/node/AbstractDataNodeOperator.java | 29 +++++++++++++--
.../manager/service/node/DataNodeOperator.java | 4 ++-
.../manager/service/node/DataNodeServiceImpl.java | 3 +-
.../service/node/mysql/MySQLDataNodeOperator.java | 15 ++++++++
7 files changed, 112 insertions(+), 5 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index 565b43576..c265a0ba8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -126,6 +126,14 @@ public enum GroupStatus {
|| status == GroupStatus.CONFIG_FAILED;
}
+ /**
+ * Checks whether the given status allows updating stream source.
+ */
+ public static boolean allowedUpdateSource(GroupStatus status) {
+ return status == GroupStatus.CONFIG_SUCCESSFUL
+ || status == GroupStatus.CONFIG_FAILED;
+ }
+
/**
* Checks whether the given status needs to delete the inlong stream first.
*/
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index ba7ad89d1..99a06ac34 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -119,6 +119,12 @@ public interface StreamSourceEntityMapper {
*/
List<String> selectSourceType(@Param("groupId") String groupId, @Param("streamId") String streamId);
+ /**
+ * Query need update source according to the dataNodeName , clusterName, sourceType
+ */
+ List<Integer> selectNeedUpdateIdsByClusterAndDataNode(@Param("clusterName") String clusterName,
+ @Param("nodeName") String nodeName, @Param("sourceType") String sourceType);
+
int updateByPrimaryKeySelective(StreamSourceEntity record);
int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
@@ -148,6 +154,16 @@ public interface StreamSourceEntityMapper {
int updateSnapshot(StreamSourceEntity entity);
+ /**
+ * Update the source status
+ *
+ * @param idList source id list
+ * @param status modify the status to this
+ * @param operator operator name
+ */
+ void updateStatusByIds(@Param("idList") List<Integer> idList, @Param("status") Integer status,
+ @Param("operator") String operator);
+
/**
* Physical delete stream sources by group id and stream id
*/
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 0f6aeb5bb..7c2df0199 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -319,7 +319,29 @@
for update
</where>
</select>
-
+ <select id="selectNeedUpdateIdsByClusterAndDataNode" resultType="java.lang.Integer">
+ select source.id
+ from stream_source source, inlong_stream stream, inlong_group inlong_group
+ <where>
+ source.is_deleted = 0
+ and inlong_group.inlong_group_id = source.inlong_group_id
+ and inlong_group.is_deleted = 0
+ and inlong_group.status in (120, 130)
+ and stream.inlong_group_id = source.inlong_group_id
+ and stream.is_deleted = 0
+ and stream.status in (120, 130)
+ and source.status not in (99, 110)
+ <if test="clusterName != null">
+ and source.inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
+ </if>
+ <if test="nodeName != null">
+ and source.data_node_name = #{nodeName, jdbcType=VARCHAR}
+ </if>
+ <if test="sourceType != null">
+ and source.source_type = #{sourceType, jdbcType=VARCHAR}
+ </if>
+ </where>
+ </select>
<update id="updateByRelatedId">
update stream_source
<set>
@@ -458,6 +480,24 @@
modify_time = modify_time
where id = #{id,jdbcType=INTEGER}
</update>
+ <update id="updateStatusByIds">
+ update stream_source
+ <set>
+ previous_status = status,
+ status = #{status, jdbcType=INTEGER},
+ modifier = #{operator, jdbcType=VARCHAR},
+ version = version + 1
+ </set>
+ <where>
+ is_deleted = 0
+ <if test="idList != null and idList.size() > 0">
+ and id in
+ <foreach item="item" index="index" collection="idList" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ </if>
+ </where>
+ </update>
<delete id="deleteByRelatedId">
delete
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
index a2e4c2eb1..efa174fd9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -19,11 +19,15 @@ package org.apache.inlong.manager.service.node;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
import org.slf4j.Logger;
@@ -33,6 +37,7 @@ import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -44,6 +49,12 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
@Autowired
protected DataNodeEntityMapper dataNodeEntityMapper;
+ @Autowired
+ protected StreamSourceEntityMapper sourceMapper;
+ @Autowired
+ protected InlongGroupEntityMapper groupMapper;
+ @Autowired
+ protected InlongStreamEntityMapper streamMapper;
@Override
@Transactional(rollbackFor = Throwable.class)
@@ -72,7 +83,6 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
// set the ext params
this.setTargetEntity(request, entity);
- this.updateRelatedStreamSource(request);
entity.setModifier(operator);
int rowCount = dataNodeEntityMapper.updateByIdSelective(entity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
@@ -95,7 +105,22 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
}
@Override
- public void updateRelatedStreamSource(DataNodeRequest request) {
+ public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) {
LOGGER.info("do nothing for the data node type ={}", request.getType());
}
+
+ public void retryStreamSourceByDataNodeNameAndType(String dataNodeName, String type, String operator) {
+ Integer status = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
+ LOGGER.info("begin to update stream source status by dataNodeName={}, status={}, by operator={}",
+ dataNodeName, status, operator);
+ List<Integer> needUpdateIds = sourceMapper.selectNeedUpdateIdsByClusterAndDataNode(null, dataNodeName, type);
+ try {
+ sourceMapper.updateStatusByIds(needUpdateIds, status, operator);
+ LOGGER.info("success to update stream source status by dataNodeName={}, status={}, by operator={}",
+ dataNodeName, status, operator);
+ } catch (Exception e) {
+ LOGGER.error("failed to update stream source status by dataNodeName={}, status={}, by operator={}",
+ dataNodeName, status, operator, e);
+ }
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
index a0618f4bb..58a12b732 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -83,7 +83,9 @@ public interface DataNodeOperator {
* Update related stream source.
*
* @param request data node request
+ * @param entity data node entity
+ * @param operator operator
*/
- void updateRelatedStreamSource(DataNodeRequest request);
+ void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index d44f32d03..b2d094e64 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -200,7 +200,7 @@ public class DataNodeServiceImpl implements DataNodeService {
}
DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
dataNodeOperator.updateOpt(request, operator);
-
+ dataNodeOperator.updateRelatedStreamSource(request, curEntity, operator);
LOGGER.info("success to update data node={}", request);
return true;
}
@@ -238,6 +238,7 @@ public class DataNodeServiceImpl implements DataNodeService {
}
DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
dataNodeOperator.updateOpt(request, opInfo.getName());
+ dataNodeOperator.updateRelatedStreamSource(request, curEntity, opInfo.getName());
return true;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index 7bc1f8f4b..f3e108ab5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.node.mysql;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -38,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.sql.Connection;
+import java.util.Objects;
/**
* MySQL data node operator
@@ -106,4 +108,17 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator {
}
}
+ @Override
+ public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) {
+ MySQLDataNodeRequest mySQLDataNodeRequest = (MySQLDataNodeRequest) request;
+ MySQLDataNodeInfo mySQLDataNodeInfo = (MySQLDataNodeInfo) this.getFromEntity(entity);
+ boolean changed = !Objects.equals(mySQLDataNodeRequest.getUrl(), mySQLDataNodeInfo.getUrl())
+ || !Objects.equals(mySQLDataNodeRequest.getBackupUrl(), mySQLDataNodeInfo.getBackupUrl())
+ || !Objects.equals(mySQLDataNodeRequest.getUsername(), mySQLDataNodeInfo.getUsername())
+ || !Objects.equals(mySQLDataNodeRequest.getToken(), mySQLDataNodeInfo.getToken());
+ if (changed) {
+ retryStreamSourceByDataNodeNameAndType(request.getName(), SourceType.MYSQL_SQL, operator);
+ }
+ }
+
}