You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/27 05:13:14 UTC
[incubator-inlong] branch master updated: [INLONG-3958][Manager] Fix some null pointer exceptions (#3959)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 19992c6ea [INLONG-3958][Manager] Fix some null pointer exceptions (#3959)
19992c6ea is described below
commit 19992c6eab1a630870235c55278e0642cb5aa147
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Apr 27 13:13:08 2022 +0800
[INLONG-3958][Manager] Fix some null pointer exceptions (#3959)
* Fix NPE in Manager Service
* InsertSelective->Insert
* fix NPE in manager service
* remove unused import
---
.../client/api/source/MySQLBinlogSource.java | 6 +-
.../client/api/util/InlongGroupTransfer.java | 2 +-
.../api/util/InlongStreamSourceTransfer.java | 2 +-
.../inlong/manager/common/enums/FieldType.java | 2 +-
.../manager/common/pojo/source/SourceRequest.java | 2 +-
.../mappers/StreamSourceFieldEntityMapper.xml | 19 +-
.../mappers/StreamTransformEntityMapper.xml | 422 +++++++++++----------
.../mappers/StreamTransformFieldEntityMapper.xml | 20 +-
.../service/core/impl/InlongGroupServiceImpl.java | 10 +-
.../service/sort/util/ExtractNodeUtils.java | 3 +
.../service/sort/util/FilterFunctionUtils.java | 3 +
.../manager/service/sort/util/LoadNodeUtils.java | 7 +-
.../service/sort/util/NodeRelationShipUtils.java | 4 +
.../service/sort/util/TransformNodeUtils.java | 2 +
.../service/source/AbstractSourceOperation.java | 4 +-
.../service/source/StreamSourceServiceImpl.java | 1 +
.../transform/StreamTransformServiceImpl.java | 10 +-
.../service/workflow/WorkflowServiceImplTest.java | 6 +-
.../main/resources/sql/apache_inlong_manager.sql | 6 +-
19 files changed, 285 insertions(+), 246 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index 11aa256d9..46653155c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -23,10 +23,10 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.DataFormat;
-import org.apache.inlong.manager.common.pojo.stream.StreamSource;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
import java.util.List;
@@ -59,7 +59,7 @@ public class MySQLBinlogSource extends StreamSource {
private int port = 3306;
@ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
- private int serverId;
+ private int serverId = 0;
@ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
+ "separate them with commas, for example: db1,test_db*",
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 1b830ab2a..2930ae512 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -34,8 +34,8 @@ import org.apache.inlong.manager.client.api.auth.Authentication;
import org.apache.inlong.manager.client.api.auth.Authentication.AuthType;
import org.apache.inlong.manager.client.api.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.client.api.auth.TokenAuthentication;
-import org.apache.inlong.manager.common.enums.GroupMode;
import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.GroupMode;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index ea6a7238d..dc7847fe8 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -280,7 +280,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setPassword(authentication.getPassword());
sourceRequest.setHostname(binlogSource.getHostname());
sourceRequest.setPort(binlogSource.getPort());
- binlogSource.setServerId(binlogSource.getServerId());
+ sourceRequest.setServerId(binlogSource.getServerId());
sourceRequest.setIncludeSchema(binlogSource.getIncludeSchema());
sourceRequest.setServerTimezone(binlogSource.getServerTimezone());
sourceRequest.setMonitoredDdl(binlogSource.getMonitoredDdl());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
index 5cb23d012..60433ca35 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
@@ -48,7 +48,7 @@ public enum FieldType {
public static FieldType forName(String name) {
Preconditions.checkNotNull(name, "FieldType should not be null");
for (FieldType value : values()) {
- if (value.toString().equals(name) || value.toString().equals(name.toUpperCase(Locale.ROOT))) {
+ if (value.toString().equals(name) || value.toString().equals(name.toLowerCase(Locale.ROOT))) {
return value;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index 6cb66e5e8..8f4cd533a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -57,7 +57,7 @@ public class SourceRequest {
private String uuid;
@ApiModelProperty("Id of the source server")
- private Integer serverId;
+ private Integer serverId = 0;
@ApiModelProperty("Id of the cluster that collected this source")
private Integer clusterId;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
index 123384e72..065bee1a1 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -54,7 +54,8 @@
from stream_source_field
where id = #{id,jdbcType=INTEGER}
</delete>
- <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+ <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+ parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
source_id, source_type, field_name,
field_value, pre_expression, field_type,
@@ -66,7 +67,8 @@
#{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
#{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
- <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+ <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
+ parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
insert into stream_source_field
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="id != null">
@@ -229,11 +231,14 @@
rank_num, is_deleted)
values
<foreach collection="list" index="index" item="item" separator=",">
- (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
- #{sourceId,jdbcType=INTEGER}, #{sourceType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
- #{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
- #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
- #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+ (#{item.id,jdbcType=INTEGER}, #{item.inlongGroupId,jdbcType=VARCHAR},
+ #{item.inlongStreamId,jdbcType=VARCHAR},
+ #{item.sourceId,jdbcType=INTEGER}, #{item.sourceType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
+ #{item.fieldValue,jdbcType=VARCHAR}, #{item.preExpression,jdbcType=VARCHAR},
+ #{item.fieldType,jdbcType=VARCHAR},
+ #{item.fieldComment,jdbcType=VARCHAR}, #{item.isMetaField,jdbcType=SMALLINT},
+ #{item.fieldFormat,jdbcType=VARCHAR},
+ #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER})
</foreach>
</insert>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
index 6cddcec46..135f95704 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
@@ -20,215 +20,219 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper">
- <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
- <id column="id" jdbcType="INTEGER" property="id" />
- <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
- <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
- <result column="transform_name" jdbcType="VARCHAR" property="transformName" />
- <result column="transform_type" jdbcType="VARCHAR" property="transformType" />
- <result column="pre_node_names" jdbcType="VARCHAR" property="preNodeNames" />
- <result column="post_node_names" jdbcType="VARCHAR" property="postNodeNames" />
- <result column="transform_definition" jdbcType="VARCHAR" property="transformDefinition" />
- <result column="version" jdbcType="INTEGER" property="version" />
- <result column="is_deleted" jdbcType="INTEGER" property="isDeleted" />
- <result column="creator" jdbcType="VARCHAR" property="creator" />
- <result column="modifier" jdbcType="VARCHAR" property="modifier" />
- <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
- <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
- </resultMap>
- <sql id="Base_Column_List">
- id, inlong_group_id, inlong_stream_id, transform_name, transform_type, pre_node_names,
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ <id column="id" jdbcType="INTEGER" property="id"/>
+ <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+ <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
+ <result column="transform_name" jdbcType="VARCHAR" property="transformName"/>
+ <result column="transform_type" jdbcType="VARCHAR" property="transformType"/>
+ <result column="pre_node_names" jdbcType="VARCHAR" property="preNodeNames"/>
+ <result column="post_node_names" jdbcType="VARCHAR" property="postNodeNames"/>
+ <result column="transform_definition" jdbcType="VARCHAR" property="transformDefinition"/>
+ <result column="version" jdbcType="INTEGER" property="version"/>
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+ <result column="creator" jdbcType="VARCHAR" property="creator"/>
+ <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+ <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+ </resultMap>
+ <sql id="Base_Column_List">
+ id
+ , inlong_group_id, inlong_stream_id, transform_name, transform_type, pre_node_names,
post_node_names, transform_definition, version, is_deleted, creator, modifier, create_time,
modify_time
- </sql>
- <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
- select
- <include refid="Base_Column_List" />
- from stream_transform
- where id = #{id,jdbcType=INTEGER}
- </select>
- <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
- select
- <include refid="Base_Column_List"/>
- from stream_transform
- <where>
- is_deleted = 0
- and inlong_group_id = #{groupId, jdbcType=VARCHAR}
- <if test="streamId != null and streamId != ''">
- and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
- </if>
- <if test="transformName != null and transformName != ''">
- and transform_name = #{transformName, jdbcType=VARCHAR}
- </if>
- </where>
- </select>
- <delete id="deleteById" parameterType="java.lang.Integer">
- delete from stream_transform
- where id = #{id,jdbcType=INTEGER}
- </delete>
- <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
- insert into stream_transform (id, inlong_group_id, inlong_stream_id,
- transform_name, transform_type, pre_node_names,
- post_node_names, transform_definition, version,
- is_deleted, creator, modifier,
- create_time, modify_time)
- values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
- #{transformName,jdbcType=VARCHAR}, #{transformType,jdbcType=VARCHAR}, #{preNodeNames,jdbcType=VARCHAR},
- #{postNodeNames,jdbcType=VARCHAR}, #{transformDefinition,jdbcType=VARCHAR}, #{version,jdbcType=INTEGER},
- #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
- #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
- </insert>
- <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
- insert into stream_transform
- <trim prefix="(" suffix=")" suffixOverrides=",">
- <if test="id != null">
- id,
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id,
- </if>
- <if test="inlongStreamId != null">
- inlong_stream_id,
- </if>
- <if test="transformName != null">
- transform_name,
- </if>
- <if test="transformType != null">
- transform_type,
- </if>
- <if test="preNodeNames != null">
- pre_node_names,
- </if>
- <if test="postNodeNames != null">
- post_node_names,
- </if>
- <if test="transformDefinition != null">
- transform_definition,
- </if>
- <if test="version != null">
- version,
- </if>
- <if test="isDeleted != null">
- is_deleted,
- </if>
- <if test="creator != null">
- creator,
- </if>
- <if test="modifier != null">
- modifier,
- </if>
- <if test="createTime != null">
- create_time,
- </if>
- <if test="modifyTime != null">
- modify_time,
- </if>
- </trim>
- <trim prefix="values (" suffix=")" suffixOverrides=",">
- <if test="id != null">
- #{id,jdbcType=INTEGER},
- </if>
- <if test="inlongGroupId != null">
- #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="inlongStreamId != null">
- #{inlongStreamId,jdbcType=VARCHAR},
- </if>
- <if test="transformName != null">
- #{transformName,jdbcType=VARCHAR},
- </if>
- <if test="transformType != null">
- #{transformType,jdbcType=VARCHAR},
- </if>
- <if test="preNodeNames != null">
- #{preNodeNames,jdbcType=VARCHAR},
- </if>
- <if test="postNodeNames != null">
- #{postNodeNames,jdbcType=VARCHAR},
- </if>
- <if test="transformDefinition != null">
- #{transformDefinition,jdbcType=VARCHAR},
- </if>
- <if test="version != null">
- #{version,jdbcType=INTEGER},
- </if>
- <if test="isDeleted != null">
- #{isDeleted,jdbcType=INTEGER},
- </if>
- <if test="creator != null">
- #{creator,jdbcType=VARCHAR},
- </if>
- <if test="modifier != null">
- #{modifier,jdbcType=VARCHAR},
- </if>
- <if test="createTime != null">
- #{createTime,jdbcType=TIMESTAMP},
- </if>
- <if test="modifyTime != null">
- #{modifyTime,jdbcType=TIMESTAMP},
- </if>
- </trim>
- </insert>
- <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
- update stream_transform
- <set>
- <if test="inlongGroupId != null">
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="inlongStreamId != null">
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- </if>
- <if test="transformName != null">
- transform_name = #{transformName,jdbcType=VARCHAR},
- </if>
- <if test="transformType != null">
- transform_type = #{transformType,jdbcType=VARCHAR},
- </if>
- <if test="preNodeNames != null">
- pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
- </if>
- <if test="postNodeNames != null">
- post_node_names = #{postNodeNames,jdbcType=VARCHAR},
- </if>
- <if test="transformDefinition != null">
- transform_definition = #{transformDefinition,jdbcType=VARCHAR},
- </if>
- <if test="version != null">
- version = #{version,jdbcType=INTEGER},
- </if>
- <if test="isDeleted != null">
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- </if>
- <if test="creator != null">
- creator = #{creator,jdbcType=VARCHAR},
- </if>
- <if test="modifier != null">
- modifier = #{modifier,jdbcType=VARCHAR},
- </if>
- <if test="createTime != null">
- create_time = #{createTime,jdbcType=TIMESTAMP},
- </if>
- <if test="modifyTime != null">
- modify_time = #{modifyTime,jdbcType=TIMESTAMP},
- </if>
- </set>
- where id = #{id,jdbcType=INTEGER}
- </update>
- <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
- update stream_transform
- set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- transform_name = #{transformName,jdbcType=VARCHAR},
- transform_type = #{transformType,jdbcType=VARCHAR},
- pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
- post_node_names = #{postNodeNames,jdbcType=VARCHAR},
- transform_definition = #{transformDefinition,jdbcType=VARCHAR},
- version = #{version,jdbcType=INTEGER},
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- creator = #{creator,jdbcType=VARCHAR},
- modifier = #{modifier,jdbcType=VARCHAR},
- create_time = #{createTime,jdbcType=TIMESTAMP},
- modify_time = #{modifyTime,jdbcType=TIMESTAMP}
- where id = #{id,jdbcType=INTEGER}
- </update>
+ </sql>
+ <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_transform
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_transform
+ <where>
+ is_deleted = 0
+ and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ <if test="streamId != null and streamId != ''">
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ </if>
+ <if test="transformName != null and transformName != ''">
+ and transform_name = #{transformName, jdbcType=VARCHAR}
+ </if>
+ </where>
+ </select>
+ <delete id="deleteById" parameterType="java.lang.Integer">
+ delete
+ from stream_transform
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+ <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+ parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ insert into stream_transform (id, inlong_group_id, inlong_stream_id,
+ transform_name, transform_type, pre_node_names,
+ post_node_names, transform_definition, version,
+ is_deleted, creator, modifier,
+ create_time, modify_time)
+ values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+ #{transformName,jdbcType=VARCHAR}, #{transformType,jdbcType=VARCHAR}, #{preNodeNames,jdbcType=VARCHAR},
+ #{postNodeNames,jdbcType=VARCHAR}, #{transformDefinition,jdbcType=VARCHAR}, #{version,jdbcType=INTEGER},
+ #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
+ #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
+ </insert>
+ <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
+ parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ insert into stream_transform
+ <trim prefix="(" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ id,
+ </if>
+ <if test="inlongGroupId != null">
+ inlong_group_id,
+ </if>
+ <if test="inlongStreamId != null">
+ inlong_stream_id,
+ </if>
+ <if test="transformName != null">
+ transform_name,
+ </if>
+ <if test="transformType != null">
+ transform_type,
+ </if>
+ <if test="preNodeNames != null">
+ pre_node_names,
+ </if>
+ <if test="postNodeNames != null">
+ post_node_names,
+ </if>
+ <if test="transformDefinition != null">
+ transform_definition,
+ </if>
+ <if test="version != null">
+ version,
+ </if>
+ <if test="isDeleted != null">
+ is_deleted,
+ </if>
+ <if test="creator != null">
+ creator,
+ </if>
+ <if test="modifier != null">
+ modifier,
+ </if>
+ <if test="createTime != null">
+ create_time,
+ </if>
+ <if test="modifyTime != null">
+ modify_time,
+ </if>
+ </trim>
+ <trim prefix="values (" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ #{id,jdbcType=INTEGER},
+ </if>
+ <if test="inlongGroupId != null">
+ #{inlongGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="inlongStreamId != null">
+ #{inlongStreamId,jdbcType=VARCHAR},
+ </if>
+ <if test="transformName != null">
+ #{transformName,jdbcType=VARCHAR},
+ </if>
+ <if test="transformType != null">
+ #{transformType,jdbcType=VARCHAR},
+ </if>
+ <if test="preNodeNames != null">
+ #{preNodeNames,jdbcType=VARCHAR},
+ </if>
+ <if test="postNodeNames != null">
+ #{postNodeNames,jdbcType=VARCHAR},
+ </if>
+ <if test="transformDefinition != null">
+ #{transformDefinition,jdbcType=VARCHAR},
+ </if>
+ <if test="version != null">
+ #{version,jdbcType=INTEGER},
+ </if>
+ <if test="isDeleted != null">
+ #{isDeleted,jdbcType=INTEGER},
+ </if>
+ <if test="creator != null">
+ #{creator,jdbcType=VARCHAR},
+ </if>
+ <if test="modifier != null">
+ #{modifier,jdbcType=VARCHAR},
+ </if>
+ <if test="createTime != null">
+ #{createTime,jdbcType=TIMESTAMP},
+ </if>
+ <if test="modifyTime != null">
+ #{modifyTime,jdbcType=TIMESTAMP},
+ </if>
+ </trim>
+ </insert>
+ <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ update stream_transform
+ <set>
+ <if test="inlongGroupId != null">
+ inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="inlongStreamId != null">
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ </if>
+ <if test="transformName != null">
+ transform_name = #{transformName,jdbcType=VARCHAR},
+ </if>
+ <if test="transformType != null">
+ transform_type = #{transformType,jdbcType=VARCHAR},
+ </if>
+ <if test="preNodeNames != null">
+ pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
+ </if>
+ <if test="postNodeNames != null">
+ post_node_names = #{postNodeNames,jdbcType=VARCHAR},
+ </if>
+ <if test="transformDefinition != null">
+ transform_definition = #{transformDefinition,jdbcType=VARCHAR},
+ </if>
+ <if test="version != null">
+ version = #{version,jdbcType=INTEGER},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
+ <if test="creator != null">
+ creator = #{creator,jdbcType=VARCHAR},
+ </if>
+ <if test="modifier != null">
+ modifier = #{modifier,jdbcType=VARCHAR},
+ </if>
+ <if test="createTime != null">
+ create_time = #{createTime,jdbcType=TIMESTAMP},
+ </if>
+ <if test="modifyTime != null">
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+ </if>
+ </set>
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+ <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ update stream_transform
+ set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ transform_name = #{transformName,jdbcType=VARCHAR},
+ transform_type = #{transformType,jdbcType=VARCHAR},
+ pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
+ post_node_names = #{postNodeNames,jdbcType=VARCHAR},
+ transform_definition = #{transformDefinition,jdbcType=VARCHAR},
+ version = #{version,jdbcType=INTEGER},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ creator = #{creator,jdbcType=VARCHAR},
+ modifier = #{modifier,jdbcType=VARCHAR},
+ create_time = #{createTime,jdbcType=TIMESTAMP},
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+ where id = #{id,jdbcType=INTEGER}
+ </update>
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
index ff8aaa034..7e058bc68 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformFieldEntityMapper.xml
@@ -70,7 +70,8 @@
from stream_transform_field
where id = #{id,jdbcType=INTEGER}
</delete>
- <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
+ <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+ parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
insert into stream_transform_field (id, inlong_group_id, inlong_stream_id,
transform_id, transform_type, field_name,
field_value, pre_expression, field_type,
@@ -82,7 +83,8 @@
#{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
#{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
</insert>
- <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
+ <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
+ parameterType="org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity">
insert into stream_transform_field
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="id != null">
@@ -245,11 +247,15 @@
rank_num, is_deleted)
values
<foreach collection="list" index="index" item="item" separator=",">
- (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
- #{transformId,jdbcType=INTEGER}, #{transformType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
- #{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
- #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
- #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+ (#{item.id,jdbcType=INTEGER}, #{item.inlongGroupId,jdbcType=VARCHAR},
+ #{item.inlongStreamId,jdbcType=VARCHAR},
+ #{item.transformId,jdbcType=INTEGER}, #{item.transformType,jdbcType=VARCHAR},
+ #{item.fieldName,jdbcType=VARCHAR},
+ #{item.fieldValue,jdbcType=VARCHAR}, #{item.preExpression,jdbcType=VARCHAR},
+ #{item.fieldType,jdbcType=VARCHAR},
+ #{item.fieldComment,jdbcType=VARCHAR}, #{item.isMetaField,jdbcType=SMALLINT},
+ #{item.fieldFormat,jdbcType=VARCHAR},
+ #{item.rankNum,jdbcType=SMALLINT}, #{item.isDeleted,jdbcType=INTEGER})
</foreach>
</insert>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index 4228bb4b2..24be9d000 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -128,14 +128,11 @@ public class InlongGroupServiceImpl implements InlongGroupService {
entity.setCreateTime(new Date());
groupMapper.insertSelective(entity);
this.saveOrUpdateExt(groupId, groupInfo.getExtList());
-
+ // Saving MQ information.
Middleware mqMiddleware = groupMqFactory.getMqMiddleware(MQType.forType(groupInfo.getMiddlewareType()));
-
- if (StringUtils.isBlank(groupInfo.getMqExtInfo().getInlongGroupId())) {
+ if (groupInfo.getMqExtInfo() != null && StringUtils.isBlank(groupInfo.getMqExtInfo().getInlongGroupId())) {
groupInfo.getMqExtInfo().setInlongGroupId(groupId);
}
-
- // Saving MQ information.
mqMiddleware.save(groupInfo.getMqExtInfo());
LOGGER.debug("success to save inlong group info for groupId={}", groupId);
return groupId;
@@ -163,7 +160,6 @@ public class InlongGroupServiceImpl implements InlongGroupService {
groupInfo.setMqExtInfo(mq.get(groupId));
// For approved inlong group, encapsulate the cluster address of the middleware
-
if (GroupStatus.CONFIG_SUCCESSFUL == GroupStatus.forCode(groupInfo.getStatus())) {
groupInfo = mq.packSpecificInfo(groupInfo);
}
@@ -234,7 +230,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
MQType mqType = MQType.forType(groupRequest.getMiddlewareType());
Middleware mqMiddleware = groupMqFactory.getMqMiddleware(mqType);
InlongGroupMqExtBase mqExtInfo = groupRequest.getMqExtInfo();
- if (StringUtils.isBlank(mqExtInfo.getInlongGroupId())) {
+ if (mqExtInfo != null && StringUtils.isBlank(mqExtInfo.getInlongGroupId())) {
mqExtInfo.setInlongGroupId(groupId);
}
mqMiddleware.update(mqExtInfo);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 965065a34..9198c5754 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.sort.util;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.enums.SourceType;
@@ -46,6 +47,7 @@ import java.util.stream.Collectors;
/**
* Parse SourceResponse to ExtractNode which sort needed
*/
+@Slf4j
public class ExtractNodeUtils {
public static List<ExtractNode> createExtractNodes(List<SourceResponse> sourceResponses) {
@@ -111,6 +113,7 @@ public class ExtractNodeUtils {
/**
* Create KafkaExtractNode based KafkaSourceResponse
+ *
* @param kafkaSourceResponse
* @return
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
index 76c2d820e..bb24eb87d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
@@ -108,6 +108,9 @@ public class FilterFunctionUtils {
}
private static LogicOperator parseLogicOperator(RuleRelation relation) {
+ if (relation == null) {
+ return EmptyOperator.getInstance();
+ }
switch (relation) {
case OR:
return OrOperator.getInstance();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 155c46596..e56feafb9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sort.util;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
@@ -61,6 +62,7 @@ public class LoadNodeUtils {
}
public static KafkaLoadNode createLoadNode(KafkaSinkResponse kafkaSinkResponse) {
+
String id = kafkaSinkResponse.getSinkName();
String name = kafkaSinkResponse.getSinkName();
String topicName = kafkaSinkResponse.getTopicName();
@@ -74,7 +76,10 @@ public class LoadNodeUtils {
.map(fieldInfo -> new FieldRelationShip(fieldInfo, fieldInfo)).collect(Collectors.toList());
Map<String, String> properties = kafkaSinkResponse.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
- Integer sinkParallelism = Integer.parseInt(kafkaSinkResponse.getPartitionNum());
+ Integer sinkParallelism = null;
+ if (StringUtils.isNotEmpty(kafkaSinkResponse.getPartitionNum())) {
+ sinkParallelism = Integer.parseInt(kafkaSinkResponse.getPartitionNum());
+ }
DataTypeEnum dataType = DataTypeEnum.forName(kafkaSinkResponse.getSerializationType());
Format format;
switch (dataType) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index 646acc085..ea004bbf1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sort.util;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.inlong.manager.common.enums.TransformType;
@@ -75,6 +76,9 @@ public class NodeRelationShipUtils {
}
public static void optimizeNodeRelationShips(StreamInfo streamInfo, List<TransformResponse> transformResponses) {
+ if (CollectionUtils.isEmpty(transformResponses)) {
+ return;
+ }
Map<String, TransformDefinition> transformTypeMap = transformResponses.stream().collect(
Collectors.toMap(transformResponse -> transformResponse.getTransformName(),
transformResponse -> {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index 77e65ed35..e913229cb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.sort.util;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.TransformType;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
@@ -37,6 +38,7 @@ import java.util.stream.Collectors;
/**
* Parse TransformResponse to TransformNode which sort needed
*/
+@Slf4j
public class TransformNodeUtils {
public static List<TransformNode> createTransformNodes(List<TransformResponse> transformResponses) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
index 4cbf4ed58..6382e6c7d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -101,7 +102,7 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
Date now = new Date();
entity.setCreateTime(now);
entity.setModifyTime(now);
- entity.setIsDeleted(0);
+ entity.setIsDeleted(GlobalConstants.UN_DELETED);
// get the ext params
setTargetEntity(request, entity);
sourceMapper.insert(entity);
@@ -242,6 +243,7 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
fieldEntity.setInlongStreamId(streamId);
fieldEntity.setSourceId(sourceId);
fieldEntity.setSourceType(sourceType);
+ fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
entityList.add(fieldEntity);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 7e029328f..5e3929e17 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -110,6 +110,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
List<SourceResponse> responseList = new ArrayList<>();
entityList.forEach(entity -> responseList.add(this.get(entity.getId(), entity.getSourceType())));
+
LOGGER.debug("success to list source by groupId={}, streamId={}", groupId, streamId);
return responseList;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index f5c90a84d..42785b8c4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
@@ -79,12 +80,14 @@ public class StreamTransformServiceImpl implements StreamTransformService {
}
StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(transformRequest,
StreamTransformEntity::new);
+ transformEntity.setVersion(0);
transformEntity.setCreator(operator);
transformEntity.setModifier(operator);
Date now = new Date();
transformEntity.setCreateTime(now);
transformEntity.setModifyTime(now);
- transformEntityMapper.insertSelective(transformEntity);
+ transformEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+ transformEntityMapper.insert(transformEntity);
saveFieldOpt(transformEntity, transformRequest.getFieldList());
return transformEntity.getId();
}
@@ -115,7 +118,9 @@ public class StreamTransformServiceImpl implements StreamTransformService {
transformResponses.stream().forEach(transformResponse -> {
int transformId = transformResponse.getId();
List<InlongStreamFieldInfo> fieldInfos = fieldInfoMap.get(transformId);
- transformResponse.setFieldList(fieldInfos);
+ if (CollectionUtils.isNotEmpty(fieldInfos)) {
+ transformResponse.setFieldList(fieldInfos);
+ }
});
return transformResponses;
}
@@ -215,6 +220,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
fieldEntity.setInlongStreamId(streamId);
fieldEntity.setTransformId(transformId);
fieldEntity.setTransformType(transformType);
+ fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
entityList.add(fieldEntity);
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 4d8b5e56b..99878a6f6 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -61,8 +61,11 @@ import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
import org.apache.inlong.manager.workflow.util.WorkflowBeanUtils;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import springfox.boot.starter.autoconfigure.OpenApiAutoConfiguration;
import java.util.ArrayList;
import java.util.Collections;
@@ -72,7 +75,8 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@EnableAutoConfiguration
+@RunWith(SpringJUnit4ClassRunner.class)
+@EnableAutoConfiguration(exclude = OpenApiAutoConfiguration.class)
public class WorkflowServiceImplTest extends ServiceBaseTest {
public static final String OPERATOR = "admin";
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 64a388729..d797e2091 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -1165,8 +1165,7 @@ CREATE TABLE `group_heartbeat`
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
PRIMARY KEY (`id`),
- UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`),
- KEY `index_report_time` (`report_time`)
+ UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8 COMMENT ='inlong group heartbeat';
@@ -1187,8 +1186,7 @@ CREATE TABLE `stream_heartbeat`
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
PRIMARY KEY (`id`),
- UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`),
- KEY `index_report_time` (`report_time`)
+ UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8 COMMENT ='inlong stream heartbeat';
-- ----------------------------