You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/03/17 10:56:52 UTC

[inlong] 03/04: [INLONG-7619][Manager] Support update and retry MySQL sources after updating MySQLDataNode (#7621)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ee9aaf10de9a3beeea897f3d9a2b80d92689b743
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Thu Mar 16 18:43:17 2023 +0800

    [INLONG-7619][Manager] Support update and retry MySQL sources after updating MySQLDataNode (#7621)
---
 .../inlong/manager/common/enums/GroupStatus.java   |  8 +++++
 .../dao/mapper/StreamSourceEntityMapper.java       | 16 +++++++++
 .../resources/mappers/StreamSourceEntityMapper.xml | 42 +++++++++++++++++++++-
 .../service/node/AbstractDataNodeOperator.java     | 29 +++++++++++++--
 .../manager/service/node/DataNodeOperator.java     |  4 ++-
 .../manager/service/node/DataNodeServiceImpl.java  |  3 +-
 .../service/node/mysql/MySQLDataNodeOperator.java  | 15 ++++++++
 7 files changed, 112 insertions(+), 5 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index 565b43576..c265a0ba8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -126,6 +126,14 @@ public enum GroupStatus {
                 || status == GroupStatus.CONFIG_FAILED;
     }
 
+    /**
+     * Checks whether the given status allows updating stream source.
+     */
+    public static boolean allowedUpdateSource(GroupStatus status) {
+        return status == GroupStatus.CONFIG_SUCCESSFUL
+                || status == GroupStatus.CONFIG_FAILED;
+    }
+
     /**
      * Checks whether the given status needs to delete the inlong stream first.
      */
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index ba7ad89d1..99a06ac34 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -119,6 +119,12 @@ public interface StreamSourceEntityMapper {
      */
     List<String> selectSourceType(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
+    /**
+     * Query need update source according to the dataNodeName , clusterName, sourceType
+     */
+    List<Integer> selectNeedUpdateIdsByClusterAndDataNode(@Param("clusterName") String clusterName,
+            @Param("nodeName") String nodeName, @Param("sourceType") String sourceType);
+
     int updateByPrimaryKeySelective(StreamSourceEntity record);
 
     int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
@@ -148,6 +154,16 @@ public interface StreamSourceEntityMapper {
 
     int updateSnapshot(StreamSourceEntity entity);
 
+    /**
+     * Update the source status
+     *
+     * @param idList source id list
+     * @param status modify the status to this
+     * @param operator operator name
+     */
+    void updateStatusByIds(@Param("idList") List<Integer> idList, @Param("status") Integer status,
+            @Param("operator") String operator);
+
     /**
      * Physical delete stream sources by group id and stream id
      */
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 0f6aeb5bb..7c2df0199 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -319,7 +319,29 @@
             for update
         </where>
     </select>
-
+    <select id="selectNeedUpdateIdsByClusterAndDataNode" resultType="java.lang.Integer">
+        select source.id
+        from stream_source source, inlong_stream stream, inlong_group inlong_group
+        <where>
+            source.is_deleted = 0
+            and inlong_group.inlong_group_id = source.inlong_group_id
+            and inlong_group.is_deleted = 0
+            and inlong_group.status in (120, 130)
+            and stream.inlong_group_id = source.inlong_group_id
+            and stream.is_deleted = 0
+            and stream.status in (120, 130)
+            and source.status not in (99, 110)
+            <if test="clusterName != null">
+                and source.inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
+            </if>
+            <if test="nodeName != null">
+                and source.data_node_name = #{nodeName, jdbcType=VARCHAR}
+            </if>
+            <if test="sourceType != null">
+                and source.source_type = #{sourceType, jdbcType=VARCHAR}
+            </if>
+        </where>
+    </select>
     <update id="updateByRelatedId">
         update stream_source
         <set>
@@ -458,6 +480,24 @@
             modify_time = modify_time
         where id = #{id,jdbcType=INTEGER}
     </update>
+    <update id="updateStatusByIds">
+        update stream_source
+        <set>
+            previous_status = status,
+            status          = #{status, jdbcType=INTEGER},
+            modifier        = #{operator, jdbcType=VARCHAR},
+            version         = version + 1
+        </set>
+        <where>
+            is_deleted = 0
+            <if test="idList != null and idList.size() > 0">
+                and id in
+                <foreach item="item" index="index" collection="idList" open="(" close=")" separator=",">
+                    #{item}
+                </foreach>
+            </if>
+        </where>
+    </update>
 
     <delete id="deleteByRelatedId">
         delete
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
index a2e4c2eb1..efa174fd9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -19,11 +19,15 @@ package org.apache.inlong.manager.service.node;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.DataNodeEntity;
 import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.node.DataNodeRequest;
 import org.slf4j.Logger;
@@ -33,6 +37,7 @@ import org.springframework.transaction.annotation.Isolation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -44,6 +49,12 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
 
     @Autowired
     protected DataNodeEntityMapper dataNodeEntityMapper;
+    @Autowired
+    protected StreamSourceEntityMapper sourceMapper;
+    @Autowired
+    protected InlongGroupEntityMapper groupMapper;
+    @Autowired
+    protected InlongStreamEntityMapper streamMapper;
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
@@ -72,7 +83,6 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
         DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
         // set the ext params
         this.setTargetEntity(request, entity);
-        this.updateRelatedStreamSource(request);
         entity.setModifier(operator);
         int rowCount = dataNodeEntityMapper.updateByIdSelective(entity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
@@ -95,7 +105,22 @@ public abstract class AbstractDataNodeOperator implements DataNodeOperator {
     }
 
     @Override
-    public void updateRelatedStreamSource(DataNodeRequest request) {
+    public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) {
         LOGGER.info("do nothing for the data node type ={}", request.getType());
     }
+
+    public void retryStreamSourceByDataNodeNameAndType(String dataNodeName, String type, String operator) {
+        Integer status = SourceStatus.TO_BE_ISSUED_RETRY.getCode();
+        LOGGER.info("begin to update stream source status by dataNodeName={}, status={}, by operator={}",
+                dataNodeName, status, operator);
+        List<Integer> needUpdateIds = sourceMapper.selectNeedUpdateIdsByClusterAndDataNode(null, dataNodeName, type);
+        try {
+            sourceMapper.updateStatusByIds(needUpdateIds, status, operator);
+            LOGGER.info("success to update stream source status by dataNodeName={}, status={}, by operator={}",
+                    dataNodeName, status, operator);
+        } catch (Exception e) {
+            LOGGER.error("failed to update stream source status by dataNodeName={}, status={}, by operator={}",
+                    dataNodeName, status, operator, e);
+        }
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
index a0618f4bb..58a12b732 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -83,7 +83,9 @@ public interface DataNodeOperator {
      * Update related stream source.
      *
      * @param request data node request
+     * @param entity data node entity
+     * @param operator operator
      */
-    void updateRelatedStreamSource(DataNodeRequest request);
+    void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator);
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index d44f32d03..b2d094e64 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -200,7 +200,7 @@ public class DataNodeServiceImpl implements DataNodeService {
         }
         DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
         dataNodeOperator.updateOpt(request, operator);
-
+        dataNodeOperator.updateRelatedStreamSource(request, curEntity, operator);
         LOGGER.info("success to update data node={}", request);
         return true;
     }
@@ -238,6 +238,7 @@ public class DataNodeServiceImpl implements DataNodeService {
         }
         DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
         dataNodeOperator.updateOpt(request, opInfo.getName());
+        dataNodeOperator.updateRelatedStreamSource(request, curEntity, opInfo.getName());
         return true;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
index 7bc1f8f4b..f3e108ab5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/mysql/MySQLDataNodeOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.node.mysql;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -38,6 +39,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.sql.Connection;
+import java.util.Objects;
 
 /**
  * MySQL data node operator
@@ -106,4 +108,17 @@ public class MySQLDataNodeOperator extends AbstractDataNodeOperator {
         }
     }
 
+    @Override
+    public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity entity, String operator) {
+        MySQLDataNodeRequest mySQLDataNodeRequest = (MySQLDataNodeRequest) request;
+        MySQLDataNodeInfo mySQLDataNodeInfo = (MySQLDataNodeInfo) this.getFromEntity(entity);
+        boolean changed = !Objects.equals(mySQLDataNodeRequest.getUrl(), mySQLDataNodeInfo.getUrl())
+                || !Objects.equals(mySQLDataNodeRequest.getBackupUrl(), mySQLDataNodeInfo.getBackupUrl())
+                || !Objects.equals(mySQLDataNodeRequest.getUsername(), mySQLDataNodeInfo.getUsername())
+                || !Objects.equals(mySQLDataNodeRequest.getToken(), mySQLDataNodeInfo.getToken());
+        if (changed) {
+            retryStreamSourceByDataNodeNameAndType(request.getName(), SourceType.MYSQL_SQL, operator);
+        }
+    }
+
 }