You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/27 05:13:14 UTC

[incubator-inlong] branch master updated: [INLONG-3958][Manager] Fix some null pointer exceptions (#3959)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 19992c6ea [INLONG-3958][Manager] Fix some null pointer exceptions (#3959)
19992c6ea is described below

commit 19992c6eab1a630870235c55278e0642cb5aa147
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Apr 27 13:13:08 2022 +0800

    [INLONG-3958][Manager] Fix some null pointer exceptions (#3959)
    
    * Fix NPE in Manager Service
    
    * InsertSelective->Insert
    
    * fix NPE in manager service
    
    * remove unused import
---
 .../client/api/source/MySQLBinlogSource.java       |   6 +-
 .../client/api/util/InlongGroupTransfer.java       |   2 +-
 .../api/util/InlongStreamSourceTransfer.java       |   2 +-
 .../inlong/manager/common/enums/FieldType.java     |   2 +-
 .../manager/common/pojo/source/SourceRequest.java  |   2 +-
 .../mappers/StreamSourceFieldEntityMapper.xml      |  19 +-
 .../mappers/StreamTransformEntityMapper.xml        | 422 +++++++++++----------
 .../mappers/StreamTransformFieldEntityMapper.xml   |  20 +-
 .../service/core/impl/InlongGroupServiceImpl.java  |  10 +-
 .../service/sort/util/ExtractNodeUtils.java        |   3 +
 .../service/sort/util/FilterFunctionUtils.java     |   3 +
 .../manager/service/sort/util/LoadNodeUtils.java   |   7 +-
 .../service/sort/util/NodeRelationShipUtils.java   |   4 +
 .../service/sort/util/TransformNodeUtils.java      |   2 +
 .../service/source/AbstractSourceOperation.java    |   4 +-
 .../service/source/StreamSourceServiceImpl.java    |   1 +
 .../transform/StreamTransformServiceImpl.java      |  10 +-
 .../service/workflow/WorkflowServiceImplTest.java  |   6 +-
 .../main/resources/sql/apache_inlong_manager.sql   |   6 +-
 19 files changed, 285 insertions(+), 246 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index 11aa256d9..46653155c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -23,10 +23,10 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.DataFormat;
-import org.apache.inlong.manager.common.pojo.stream.StreamSource;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
 
 import java.util.List;
 
@@ -59,7 +59,7 @@ public class MySQLBinlogSource extends StreamSource {
     private int port = 3306;
 
     @ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
-    private int serverId;
+    private int serverId = 0;
 
     @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
             + "separate them with commas, for example: db1,test_db*",
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 1b830ab2a..2930ae512 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -34,8 +34,8 @@ import org.apache.inlong.manager.client.api.auth.Authentication;
 import org.apache.inlong.manager.client.api.auth.Authentication.AuthType;
 import org.apache.inlong.manager.client.api.auth.SecretTokenAuthentication;
 import org.apache.inlong.manager.client.api.auth.TokenAuthentication;
-import org.apache.inlong.manager.common.enums.GroupMode;
 import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.GroupMode;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index ea6a7238d..dc7847fe8 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -280,7 +280,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setPassword(authentication.getPassword());
         sourceRequest.setHostname(binlogSource.getHostname());
         sourceRequest.setPort(binlogSource.getPort());
-        binlogSource.setServerId(binlogSource.getServerId());
+        sourceRequest.setServerId(binlogSource.getServerId());
         sourceRequest.setIncludeSchema(binlogSource.getIncludeSchema());
         sourceRequest.setServerTimezone(binlogSource.getServerTimezone());
         sourceRequest.setMonitoredDdl(binlogSource.getMonitoredDdl());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
index 5cb23d012..60433ca35 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
@@ -48,7 +48,7 @@ public enum FieldType {
     public static FieldType forName(String name) {
         Preconditions.checkNotNull(name, "FieldType should not be null");
         for (FieldType value : values()) {
-            if (value.toString().equals(name) || value.toString().equals(name.toUpperCase(Locale.ROOT))) {
+            if (value.toString().equals(name) || value.toString().equals(name.toLowerCase(Locale.ROOT))) {
                 return value;
             }
         }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index 6cb66e5e8..8f4cd533a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -57,7 +57,7 @@ public class SourceRequest {
     private String uuid;
 
     @ApiModelProperty("Id of the source server")
-    private Integer serverId;
+    private Integer serverId = 0;
 
     @ApiModelProperty("Id of the cluster that collected this source")
     private Integer clusterId;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
index 123384e72..065bee1a1 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -54,7 +54,8 @@
         from stream_source_field
         where id = #{id,jdbcType=INTEGER}
     </delete>
-    <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+            parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
         insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
                                          source_id, source_type, field_name,
                                          field_value, pre_expression, field_type,
@@ -66,7 +67,8 @@
                 #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
                 #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
-    <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+    <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
+            parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
         insert into stream_source_field
         <trim prefix="(" suffix=")" suffixOverrides=",">
             <if test="id != null">
@@ -229,11 +231,14 @@
         rank_num, is_deleted)
         values
         <foreach collection="list" index="index" item="item" separator=",">
-            (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
-            #{sourceId,jdbcType=INTEGER}, #{sourceType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
-            #{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
-            #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
-            #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+            (#{item.id,jdbcType=INTEGER}, #{item.inlongGroupId,jdbcType=VARCHAR},
+            #{item.inlongStreamId,jdbcType=VARCHAR},
+            #{item.sourceId,jdbcType=INTEGER}, #{item.sourceType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
+            #{item.fieldValue,jdbcType=VARCHAR}, #{item.preExpression,jdbcType=VARCHAR},
+            #{item.fieldType,jdbcType=VARCHAR},
+            #{item.fieldComment,jdbcType=VARCHAR}, #{item.isMetaField,jdbcType=SMALLINT},
+            #{item.fieldFormat,jdbcType=VARCHAR},
+            #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER})
         </foreach>
     </insert>
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
index 6cddcec46..135f95704 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
@@ -20,215 +20,219 @@
 
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper">
-  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
-    <id column="id" jdbcType="INTEGER" property="id" />
-    <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
-    <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
-    <result column="transform_name" jdbcType="VARCHAR" property="transformName" />
-    <result column="transform_type" jdbcType="VARCHAR" property="transformType" />
-    <result column="pre_node_names" jdbcType="VARCHAR" property="preNodeNames" />
-    <result column="post_node_names" jdbcType="VARCHAR" property="postNodeNames" />
-    <result column="transform_definition" jdbcType="VARCHAR" property="transformDefinition" />
-    <result column="version" jdbcType="INTEGER" property="version" />
-    <result column="is_deleted" jdbcType="INTEGER" property="isDeleted" />
-    <result column="creator" jdbcType="VARCHAR" property="creator" />
-    <result column="modifier" jdbcType="VARCHAR" property="modifier" />
-    <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
-    <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
-  </resultMap>
-  <sql id="Base_Column_List">
-    id, inlong_group_id, inlong_stream_id, transform_name, transform_type, pre_node_names, 
+    <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+        <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
+        <result column="transform_name" jdbcType="VARCHAR" property="transformName"/>
+        <result column="transform_type" jdbcType="VARCHAR" property="transformType"/>
+        <result column="pre_node_names" jdbcType="VARCHAR" property="preNodeNames"/>
+        <result column="post_node_names" jdbcType="VARCHAR" property="postNodeNames"/>
+        <result column="transform_definition" jdbcType="VARCHAR" property="transformDefinition"/>
+        <result column="version" jdbcType="INTEGER" property="version"/>
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+        <result column="creator" jdbcType="VARCHAR" property="creator"/>
+        <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+        <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+    </resultMap>
+    <sql id="Base_Column_List">
+        id
+        , inlong_group_id, inlong_stream_id, transform_name, transform_type, pre_node_names,
     post_node_names, transform_definition, version, is_deleted, creator, modifier, create_time, 
     modify_time
-  </sql>
-  <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
-    select 
-    <include refid="Base_Column_List" />
-    from stream_transform
-    where id = #{id,jdbcType=INTEGER}
-  </select>
-  <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
-    select
-    <include refid="Base_Column_List"/>
-    from stream_transform
-    <where>
-      is_deleted = 0
-      and inlong_group_id = #{groupId, jdbcType=VARCHAR}
-      <if test="streamId != null and streamId != ''">
-        and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
-      </if>
-      <if test="transformName != null and transformName != ''">
-        and transform_name = #{transformName, jdbcType=VARCHAR}
-      </if>
-    </where>
-  </select>
-  <delete id="deleteById" parameterType="java.lang.Integer">
-    delete from stream_transform
-    where id = #{id,jdbcType=INTEGER}
-  </delete>
-  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
-    insert into stream_transform (id, inlong_group_id, inlong_stream_id, 
-      transform_name, transform_type, pre_node_names, 
-      post_node_names, transform_definition, version, 
-      is_deleted, creator, modifier, 
-      create_time, modify_time)
-    values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, 
-      #{transformName,jdbcType=VARCHAR}, #{transformType,jdbcType=VARCHAR}, #{preNodeNames,jdbcType=VARCHAR}, 
-      #{postNodeNames,jdbcType=VARCHAR}, #{transformDefinition,jdbcType=VARCHAR}, #{version,jdbcType=INTEGER}, 
-      #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}, 
-      #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
-  </insert>
-  <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
-    insert into stream_transform
-    <trim prefix="(" suffix=")" suffixOverrides=",">
-      <if test="id != null">
-        id,
-      </if>
-      <if test="inlongGroupId != null">
-        inlong_group_id,
-      </if>
-      <if test="inlongStreamId != null">
-        inlong_stream_id,
-      </if>
-      <if test="transformName != null">
-        transform_name,
-      </if>
-      <if test="transformType != null">
-        transform_type,
-      </if>
-      <if test="preNodeNames != null">
-        pre_node_names,
-      </if>
-      <if test="postNodeNames != null">
-        post_node_names,
-      </if>
-      <if test="transformDefinition != null">
-        transform_definition,
-      </if>
-      <if test="version != null">
-        version,
-      </if>
-      <if test="isDeleted != null">
-        is_deleted,
-      </if>
-      <if test="creator != null">
-        creator,
-      </if>
-      <if test="modifier != null">
-        modifier,
-      </if>
-      <if test="createTime != null">
-        create_time,
-      </if>
-      <if test="modifyTime != null">
-        modify_time,
-      </if>
-    </trim>
-    <trim prefix="values (" suffix=")" suffixOverrides=",">
-      <if test="id != null">
-        #{id,jdbcType=INTEGER},
-      </if>
-      <if test="inlongGroupId != null">
-        #{inlongGroupId,jdbcType=VARCHAR},
-      </if>
-      <if test="inlongStreamId != null">
-        #{inlongStreamId,jdbcType=VARCHAR},
-      </if>
-      <if test="transformName != null">
-        #{transformName,jdbcType=VARCHAR},
-      </if>
-      <if test="transformType != null">
-        #{transformType,jdbcType=VARCHAR},
-      </if>
-      <if test="preNodeNames != null">
-        #{preNodeNames,jdbcType=VARCHAR},
-      </if>
-      <if test="postNodeNames != null">
-        #{postNodeNames,jdbcType=VARCHAR},
-      </if>
-      <if test="transformDefinition != null">
-        #{transformDefinition,jdbcType=VARCHAR},
-      </if>
-      <if test="version != null">
-        #{version,jdbcType=INTEGER},
-      </if>
-      <if test="isDeleted != null">
-        #{isDeleted,jdbcType=INTEGER},
-      </if>
-      <if test="creator != null">
-        #{creator,jdbcType=VARCHAR},
-      </if>
-      <if test="modifier != null">
-        #{modifier,jdbcType=VARCHAR},
-      </if>
-      <if test="createTime != null">
-        #{createTime,jdbcType=TIMESTAMP},
-      </if>
-      <if test="modifyTime != null">
-        #{modifyTime,jdbcType=TIMESTAMP},
-      </if>
-    </trim>
-  </insert>
-  <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
-    update stream_transform
-    <set>
-      <if test="inlongGroupId != null">
-        inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-      </if>
-      <if test="inlongStreamId != null">
-        inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-      </if>
-      <if test="transformName != null">
-        transform_name = #{transformName,jdbcType=VARCHAR},
-      </if>
-      <if test="transformType != null">
-        transform_type = #{transformType,jdbcType=VARCHAR},
-      </if>
-      <if test="preNodeNames != null">
-        pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
-      </if>
-      <if test="postNodeNames != null">
-        post_node_names = #{postNodeNames,jdbcType=VARCHAR},
-      </if>
-      <if test="transformDefinition != null">
-        transform_definition = #{transformDefinition,jdbcType=VARCHAR},
-      </if>
-      <if test="version != null">
-        version = #{version,jdbcType=INTEGER},
-      </if>
-      <if test="isDeleted != null">
-        is_deleted = #{isDeleted,jdbcType=INTEGER},
-      </if>
-      <if test="creator != null">
-        creator = #{creator,jdbcType=VARCHAR},
-      </if>
-      <if test="modifier != null">
-        modifier = #{modifier,jdbcType=VARCHAR},
-      </if>
-      <if test="createTime != null">
-        create_time = #{createTime,jdbcType=TIMESTAMP},
-      </if>
-      <if test="modifyTime != null">
-        modify_time = #{modifyTime,jdbcType=TIMESTAMP},
-      </if>
-    </set>
-    where id = #{id,jdbcType=INTEGER}
-  </update>
-  <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
-    update stream_transform
-    set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-      inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-      transform_name = #{transformName,jdbcType=VARCHAR},
-      transform_type = #{transformType,jdbcType=VARCHAR},
-      pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
-      post_node_names = #{postNodeNames,jdbcType=VARCHAR},
-      transform_definition = #{transformDefinition,jdbcType=VARCHAR},
-      version = #{version,jdbcType=INTEGER},
-      is_deleted = #{isDeleted,jdbcType=INTEGER},
-      creator = #{creator,jdbcType=VARCHAR},
-      modifier = #{modifier,jdbcType=VARCHAR},
-      create_time = #{createTime,jdbcType=TIMESTAMP},
-      modify_time = #{modifyTime,jdbcType=TIMESTAMP}
-    where id = #{id,jdbcType=INTEGER}
-  </update>
+    </sql>
+    <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_transform
+        where id = #{id,jdbcType=INTEGER}
+    </select>
+    <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_transform
+        <where>
+            is_deleted = 0
+            and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+            <if test="streamId != null and streamId != ''">
+                and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+            </if>
+            <if test="transformName != null and transformName != ''">
+                and transform_name = #{transformName, jdbcType=VARCHAR}
+            </if>
+        </where>
+    </select>
+    <delete id="deleteById" parameterType="java.lang.Integer">
+        delete
+        from stream_transform
+        where id = #{id,jdbcType=INTEGER}
+    </delete>
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+            parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+        insert into stream_transform (id, inlong_group_id, inlong_stream_id,
+                                      transform_name, transform_type, pre_node_names,
+                                      post_node_names, transform_definition, version,
+                                      is_deleted, creator, modifier,
+                                      create_time, modify_time)
+        values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+                #{transformName,jdbcType=VARCHAR}, #{transformType,jdbcType=VARCHAR}, #{preNodeNames,jdbcType=VARCHAR},
+                #{postNodeNames,jdbcType=VARCHAR}, #{transformDefinition,jdbcType=VARCHAR}, #{version,jdbcType=INTEGER},
+                #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
+                #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
+    </insert>
+    <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
+            parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+        insert into stream_transform
+        <trim prefix="(" suffix=")" suffixOverrides=",">
+            <if test="id != null">
+                id,
+            </if>
+            <if test="inlongGroupId != null">
+                inlong_group_id,
+            </if>
+            <if test="inlongStreamId != null">
+                inlong_stream_id,
+            </if>
+            <if test="transformName != null">
+                transform_name,
+            </if>
+            <if test="transformType != null">
+                transform_type,
+            </if>
+            <if test="preNodeNames != null">
+                pre_node_names,
+            </if>
+            <if test="postNodeNames != null">
+                post_node_names,
+            </if>
+            <if test="transformDefinition != null">
+                transform_definition,
+            </if>
+            <if test="version != null">
+                version,
+            </if>
+            <if test="isDeleted != null">
+                is_deleted,
+            </if>
+            <if test="creator != null">
+                creator,
+            </if>
+            <if test="modifier != null">
+                modifier,
+            </if>
+            <if test="createTime != null">
+                create_time,
+            </if>
+            <if test="modifyTime != null">
+                modify_time,
+            </if>
+        </trim>
+        <trim prefix="values (" suffix=")" suffixOverrides=",">
+            <if test="id != null">
+                #{id,jdbcType=INTEGER},
+            </if>
+            <if test="inlongGroupId != null">
+                #{inlongGroupId,jdbcType=VARCHAR},
+            </if>
+            <if test="inlongStreamId != null">
+                #{inlongStreamId,jdbcType=VARCHAR},
+            </if>
+            <if test="transformName != null">
+                #{transformName,jdbcType=VARCHAR},
+            </if>
+            <if test="transformType != null">
+                #{transformType,jdbcType=VARCHAR},
+            </if>
+            <if test="preNodeNames != null">
+                #{preNodeNames,jdbcType=VARCHAR},
+            </if>
+            <if test="postNodeNames != null">
+                #{postNodeNames,jdbcType=VARCHAR},
+            </if>
+            <if test="transformDefinition != null">
+                #{transformDefinition,jdbcType=VARCHAR},
+            </if>
+            <if test="version != null">
+                #{version,jdbcType=INTEGER},
+            </if>
+            <if test="isDeleted != null">
+                #{isDeleted,jdbcType=INTEGER},
+            </if>
+            <if test="creator != null">
+                #{creator,jdbcType=VARCHAR},
+            </if>
+            <if test="modifier != null">
+                #{modifier,jdbcType=VARCHAR},
+            </if>
+            <if test="createTime != null">
+                #{createTime,jdbcType=TIMESTAMP},
+            </if>
+            <if test="modifyTime != null">
+                #{modifyTime,jdbcType=TIMESTAMP},
+            </if>
+        </trim>
+    </insert>
+    <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+        update stream_transform
+        <set>
+            <if test="inlongGroupId != null">
+                inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+            </if>
+            <if test="inlongStreamId != null">
+                inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+            </if>
+            <if test="transformName != null">
+                transform_name = #{transformName,jdbcType=VARCHAR},
+            </if>
+            <if test="transformType != null">
+                transform_type = #{transformType,jdbcType=VARCHAR},
+            </if>
+            <if test="preNodeNames != null">
+                pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
+            </if>
+            <if test="postNodeNames != null">
+                post_node_names = #{postNodeNames,jdbcType=VARCHAR},
+            </if>
+            <if test="transformDefinition != null">
+                transform_definition = #{transformDefinition,jdbcType=VARCHAR},
+            </if>
+            <if test="version != null">
+                version = #{version,jdbcType=INTEGER},
+            </if>
+            <if test="isDeleted != null">
+                is_deleted = #{isDeleted,jdbcType=INTEGER},
+            </if>
+            <if test="creator != null">
+                creator = #{creator,jdbcType=VARCHAR},
+            </if>
+            <if test="modifier != null">
+                modifier = #{modifier,jdbcType=VARCHAR},
+            </if>
+            <if test="createTime != null">
+                create_time = #{createTime,jdbcType=TIMESTAMP},
+            </if>
+            <if test="modifyTime != null">
+                modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+            </if>
+        </set>
+        where id = #{id,jdbcType=INTEGER}
+    </update>
+    <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+        update stream_transform
+        set inlong_group_id      = #{inlongGroupId,jdbcType=VARCHAR},
+            inlong_stream_id     = #{inlongStreamId,jdbcType=VARCHAR},
+            transform_name       = #{transformName,jdbcType=VARCHAR},
+            transform_type       = #{transformType,jdbcType=VARCHAR},
+            pre_node_names       = #{preNodeNames,jdbcType=VARCHAR},
+            post_node_names      = #{postNodeNames,jdbcType=VARCHAR},
+            transform_definition = #{transformDefinition,jdbcType=VARCHAR},
+            version              = #{version,jdbcType=INTEGER},
+            is_deleted           = #{isDeleted,jdbcType=INTEGER},
+            creator              = #{creator,jdbcType=VARCHAR},
+            modifier             = #{modifier,jdbcType=VARCHAR},
+            create_time          = #{createTime,jdbcType=TIMESTAMP},
+            modify_time          = #{modifyTime,jdbcType=TIMESTAMP}
+        where id = #{id,jdbcType=INTEGER}
+    </update>
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
index ff8aaa034..7e058bc68 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
@@ -70,7 +70,8 @@
         from stream_transform_field
         where id = #{id,jdbcType=INTEGER}
     </delete>
-    <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+            parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
         insert into stream_transform_field (id, inlong_group_id, inlong_stream_id,
                                             transform_id, transform_type, field_name,
                                             field_value, pre_expression, field_type,
@@ -82,7 +83,8 @@
                 #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
                 #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
     </insert>
-    <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
+    <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
+            parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
         insert into stream_transform_field
         <trim prefix="(" suffix=")" suffixOverrides=",">
             <if test="id != null">
@@ -245,11 +247,15 @@
         rank_num, is_deleted)
         values
         <foreach collection="list" index="index" item="item" separator=",">
-            (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
-            #{transformId,jdbcType=INTEGER}, #{transformType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
-            #{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
-            #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
-            #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+            (#{item.id,jdbcType=INTEGER}, #{item.inlongGroupId,jdbcType=VARCHAR},
+            #{item.inlongStreamId,jdbcType=VARCHAR},
+            #{item.transformId,jdbcType=INTEGER}, #{item.transformType,jdbcType=VARCHAR},
+            #{item.fieldName,jdbcType=VARCHAR},
+            #{item.fieldValue,jdbcType=VARCHAR}, #{item.preExpression,jdbcType=VARCHAR},
+            #{item.fieldType,jdbcType=VARCHAR},
+            #{item.fieldComment,jdbcType=VARCHAR}, #{item.isMetaField,jdbcType=SMALLINT},
+            #{item.fieldFormat,jdbcType=VARCHAR},
+            #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER})
         </foreach>
     </insert>
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index 4228bb4b2..24be9d000 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -128,14 +128,11 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         entity.setCreateTime(new Date());
         groupMapper.insertSelective(entity);
         this.saveOrUpdateExt(groupId, groupInfo.getExtList());
-
+        // Saving MQ information.
         Middleware mqMiddleware = groupMqFactory.getMqMiddleware(MQType.forType(groupInfo.getMiddlewareType()));
-
-        if (StringUtils.isBlank(groupInfo.getMqExtInfo().getInlongGroupId())) {
+        if (groupInfo.getMqExtInfo() != null && StringUtils.isBlank(groupInfo.getMqExtInfo().getInlongGroupId())) {
             groupInfo.getMqExtInfo().setInlongGroupId(groupId);
         }
-
-        // Saving MQ information.
         mqMiddleware.save(groupInfo.getMqExtInfo());
         LOGGER.debug("success to save inlong group info for groupId={}", groupId);
         return groupId;
@@ -163,7 +160,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         groupInfo.setMqExtInfo(mq.get(groupId));
 
         // For approved inlong group, encapsulate the cluster address of the middleware
-
         if (GroupStatus.CONFIG_SUCCESSFUL == GroupStatus.forCode(groupInfo.getStatus())) {
             groupInfo = mq.packSpecificInfo(groupInfo);
         }
@@ -234,7 +230,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         MQType mqType = MQType.forType(groupRequest.getMiddlewareType());
         Middleware mqMiddleware = groupMqFactory.getMqMiddleware(mqType);
         InlongGroupMqExtBase mqExtInfo = groupRequest.getMqExtInfo();
-        if (StringUtils.isBlank(mqExtInfo.getInlongGroupId())) {
+        if (mqExtInfo != null && StringUtils.isBlank(mqExtInfo.getInlongGroupId())) {
             mqExtInfo.setInlongGroupId(groupId);
         }
         mqMiddleware.update(mqExtInfo);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 965065a34..9198c5754 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.sort.util;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.enums.SourceType;
@@ -46,6 +47,7 @@ import java.util.stream.Collectors;
 /**
  * Parse SourceResponse to ExtractNode which sort needed
  */
+@Slf4j
 public class ExtractNodeUtils {
 
     public static List<ExtractNode> createExtractNodes(List<SourceResponse> sourceResponses) {
@@ -111,6 +113,7 @@ public class ExtractNodeUtils {
 
     /**
      * Create KafkaExtractNode based KafkaSourceResponse
+     *
      * @param kafkaSourceResponse
      * @return
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
index 76c2d820e..bb24eb87d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
@@ -108,6 +108,9 @@ public class FilterFunctionUtils {
     }
 
     private static LogicOperator parseLogicOperator(RuleRelation relation) {
+        if (relation == null) {
+            return EmptyOperator.getInstance();
+        }
         switch (relation) {
             case OR:
                 return OrOperator.getInstance();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 155c46596..e56feafb9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sort.util;
 
 import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
@@ -61,6 +62,7 @@ public class LoadNodeUtils {
     }
 
     public static KafkaLoadNode createLoadNode(KafkaSinkResponse kafkaSinkResponse) {
+
         String id = kafkaSinkResponse.getSinkName();
         String name = kafkaSinkResponse.getSinkName();
         String topicName = kafkaSinkResponse.getTopicName();
@@ -74,7 +76,10 @@ public class LoadNodeUtils {
                 .map(fieldInfo -> new FieldRelationShip(fieldInfo, fieldInfo)).collect(Collectors.toList());
         Map<String, String> properties = kafkaSinkResponse.getProperties().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-        Integer sinkParallelism = Integer.parseInt(kafkaSinkResponse.getPartitionNum());
+        Integer sinkParallelism = null;
+        if (StringUtils.isNotEmpty(kafkaSinkResponse.getPartitionNum())) {
+            sinkParallelism = Integer.parseInt(kafkaSinkResponse.getPartitionNum());
+        }
         DataTypeEnum dataType = DataTypeEnum.forName(kafkaSinkResponse.getSerializationType());
         Format format;
         switch (dataType) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index 646acc085..ea004bbf1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sort.util;
 
 import com.google.common.collect.Lists;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.shaded.com.google.common.collect.Maps;
 import org.apache.inlong.manager.common.enums.TransformType;
@@ -75,6 +76,9 @@ public class NodeRelationShipUtils {
     }
 
     public static void optimizeNodeRelationShips(StreamInfo streamInfo, List<TransformResponse> transformResponses) {
+        if (CollectionUtils.isEmpty(transformResponses)) {
+            return;
+        }
         Map<String, TransformDefinition> transformTypeMap = transformResponses.stream().collect(
                 Collectors.toMap(transformResponse -> transformResponse.getTransformName(),
                         transformResponse -> {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index 77e65ed35..e913229cb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.sort.util;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.TransformType;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
@@ -37,6 +38,7 @@ import java.util.stream.Collectors;
 /**
  * Parse TransformResponse to TransformNode which sort needed
  */
+@Slf4j
 public class TransformNodeUtils {
 
     public static List<TransformNode> createTransformNodes(List<TransformResponse> transformResponses) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
index 4cbf4ed58..6382e6c7d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -101,7 +102,7 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
         Date now = new Date();
         entity.setCreateTime(now);
         entity.setModifyTime(now);
-        entity.setIsDeleted(0);
+        entity.setIsDeleted(GlobalConstants.UN_DELETED);
         // get the ext params
         setTargetEntity(request, entity);
         sourceMapper.insert(entity);
@@ -242,6 +243,7 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
             fieldEntity.setInlongStreamId(streamId);
             fieldEntity.setSourceId(sourceId);
             fieldEntity.setSourceType(sourceType);
+            fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
             entityList.add(fieldEntity);
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 7e029328f..5e3929e17 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -110,6 +110,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         }
         List<SourceResponse> responseList = new ArrayList<>();
         entityList.forEach(entity -> responseList.add(this.get(entity.getId(), entity.getSourceType())));
+
         LOGGER.debug("success to list source by groupId={}, streamId={}", groupId, streamId);
         return responseList;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index f5c90a84d..42785b8c4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
@@ -79,12 +80,14 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         }
         StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(transformRequest,
                 StreamTransformEntity::new);
+        transformEntity.setVersion(0);
         transformEntity.setCreator(operator);
         transformEntity.setModifier(operator);
         Date now = new Date();
         transformEntity.setCreateTime(now);
         transformEntity.setModifyTime(now);
-        transformEntityMapper.insertSelective(transformEntity);
+        transformEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+        transformEntityMapper.insert(transformEntity);
         saveFieldOpt(transformEntity, transformRequest.getFieldList());
         return transformEntity.getId();
     }
@@ -115,7 +118,9 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         transformResponses.stream().forEach(transformResponse -> {
             int transformId = transformResponse.getId();
             List<InlongStreamFieldInfo> fieldInfos = fieldInfoMap.get(transformId);
-            transformResponse.setFieldList(fieldInfos);
+            if (CollectionUtils.isNotEmpty(fieldInfos)) {
+                transformResponse.setFieldList(fieldInfos);
+            }
         });
         return transformResponses;
     }
@@ -215,6 +220,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
             fieldEntity.setInlongStreamId(streamId);
             fieldEntity.setTransformId(transformId);
             fieldEntity.setTransformType(transformType);
+            fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
             entityList.add(fieldEntity);
         }
 
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 4d8b5e56b..99878a6f6 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -61,8 +61,11 @@ import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
 import org.apache.inlong.manager.workflow.util.WorkflowBeanUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import springfox.boot.starter.autoconfigure.OpenApiAutoConfiguration;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -72,7 +75,8 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@EnableAutoConfiguration
+@RunWith(SpringJUnit4ClassRunner.class)
+@EnableAutoConfiguration(exclude = OpenApiAutoConfiguration.class)
 public class WorkflowServiceImplTest extends ServiceBaseTest {
 
     public static final String OPERATOR = "admin";
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 64a388729..d797e2091 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -1165,8 +1165,7 @@ CREATE TABLE `group_heartbeat`
     `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`),
-    KEY `index_report_time` (`report_time`)
+    UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8 COMMENT ='inlong group heartbeat';
 
@@ -1187,8 +1186,7 @@ CREATE TABLE `stream_heartbeat`
     `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`),
-    KEY `index_report_time` (`report_time`)
+    UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8 COMMENT ='inlong stream heartbeat';
 -- ----------------------------