You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/07 14:54:09 UTC
[incubator-inlong] branch master updated: [INLONG-4084][Manager] Add some fields for stream_source and stream_sink (#4093)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3d609d8a0 [INLONG-4084][Manager] Add some fields for stream_source and stream_sink (#4093)
3d609d8a0 is described below
commit 3d609d8a09c58c4630c18c6b8e0c26712605a181
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Sat May 7 22:54:04 2022 +0800
[INLONG-4084][Manager] Add some fields for stream_source and stream_sink (#4093)
* [INLONG-4084][Manager] Add some fields for stream_source and stream_sink
* [INLONG-4084][Manager] Delete dataNodeName for BinlogSourceDTO
* [INLONG-4084][Manager] Fix jdbcType errors in StreamSinkMapper.xml
---
.../client/api/source/MySQLBinlogSource.java | 2 +-
.../api/util/InlongStreamSourceTransfer.java | 2 +
.../manager/common/pojo/sink/SinkListResponse.java | 14 ++++-
.../manager/common/pojo/sink/SinkPageRequest.java | 12 ++++
.../manager/common/pojo/sink/SinkRequest.java | 12 ++++
.../manager/common/pojo/sink/SinkResponse.java | 12 ++++
.../common/pojo/source/SourceListResponse.java | 4 +-
.../manager/common/pojo/source/SourceRequest.java | 4 +-
.../manager/common/pojo/source/SourceResponse.java | 4 +-
.../common/pojo/source/binlog/BinlogSourceDTO.java | 2 +-
.../source/binlog/BinlogSourceListResponse.java | 3 +
.../pojo/source/binlog/BinlogSourceRequest.java | 3 +
.../pojo/source/binlog/BinlogSourceResponse.java | 3 +
.../manager/common/pojo/stream/StreamNode.java | 4 ++
.../manager/common/pojo/stream/StreamSink.java | 2 +
.../manager/common/pojo/stream/StreamSource.java | 2 +
.../manager/dao/entity/StreamSinkEntity.java | 4 ++
.../manager/dao/entity/StreamSourceEntity.java | 2 +-
.../resources/mappers/StreamSinkEntityMapper.xml | 70 ++++++++++++++++++++--
.../resources/mappers/StreamSourceEntityMapper.xml | 14 ++---
.../main/resources/sql/apache_inlong_manager.sql | 6 +-
.../manager-web/sql/apache_inlong_manager.sql | 6 +-
22 files changed, 163 insertions(+), 24 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index 46653155c..0e06580ac 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -59,7 +59,7 @@ public class MySQLBinlogSource extends StreamSource {
private int port = 3306;
@ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
- private int serverId = 0;
+ private Integer serverId = 0;
@ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
+ "separate them with commas, for example: db1,test_db*",
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index a2b5096d1..6faca5297 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -115,6 +115,7 @@ public class InlongStreamSourceTransfer {
binlogSource.setAgentIp(response.getAgentIp());
binlogSource.setState(State.parseByStatus(response.getStatus()));
binlogSource.setServerId(response.getServerId());
+ binlogSource.setDataNodeName(response.getDataNodeName());
DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
response.getUser(),
response.getPassword());
@@ -193,6 +194,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setHostname(binlogSource.getHostname());
sourceRequest.setPort(binlogSource.getPort());
sourceRequest.setServerId(binlogSource.getServerId());
+ sourceRequest.setDataNodeName(binlogSource.getDataNodeName());
sourceRequest.setIncludeSchema(binlogSource.getIncludeSchema());
sourceRequest.setServerTimezone(binlogSource.getServerTimezone());
sourceRequest.setMonitoredDdl(binlogSource.getMonitoredDdl());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
index b8212a99b..0fe441e9c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
@@ -19,10 +19,10 @@ package org.apache.inlong.manager.common.pojo.sink;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Map;
import lombok.Data;
import java.util.Date;
+import java.util.Map;
/**
* Response of the sink list
@@ -51,6 +51,18 @@ public class SinkListResponse {
@ApiModelProperty("Sink description")
private String description;
+ @ApiModelProperty("Inlong cluster name")
+ private String inlongClusterName;
+
+ @ApiModelProperty("Data node name")
+ private String dataNodeName;
+
+ @ApiModelProperty("Sort task name")
+ private String sortTaskName;
+
+ @ApiModelProperty("Sort consumer group")
+ private String sortConsumerGroup;
+
@ApiModelProperty(value = "Whether to enable create sink resource? 0: disable, 1: enable. default is 1")
private Integer enableCreateResource;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
index ac368e45b..55990a534 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
@@ -49,4 +49,16 @@ public class SinkPageRequest extends PageRequest {
@ApiModelProperty(value = "Status")
private Integer status;
+ @ApiModelProperty("Inlong cluster name")
+ private String inlongClusterName;
+
+ @ApiModelProperty("Data node name")
+ private String dataNodeName;
+
+ @ApiModelProperty("Sort task name")
+ private String sortTaskName;
+
+ @ApiModelProperty("Sort consumer group")
+ private String sortConsumerGroup;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
index 52793f588..6c5d44358 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
@@ -57,6 +57,18 @@ public class SinkRequest {
@ApiModelProperty("Sink description")
private String description;
+ @ApiModelProperty("Inlong cluster name")
+ private String inlongClusterName;
+
+ @ApiModelProperty("Data node name")
+ private String dataNodeName;
+
+ @ApiModelProperty("Sort task name")
+ private String sortTaskName;
+
+ @ApiModelProperty("Sort consumer group")
+ private String sortConsumerGroup;
+
@ApiModelProperty(value = "Whether to enable create sink resource? 0: disable, 1: enable. default is 1")
private Integer enableCreateResource = 1;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
index 6b25928fc..30f249237 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
@@ -53,6 +53,18 @@ public class SinkResponse {
@ApiModelProperty("Sink description")
private String description;
+ @ApiModelProperty("Inlong cluster name")
+ private String inlongClusterName;
+
+ @ApiModelProperty("Data node name")
+ private String dataNodeName;
+
+ @ApiModelProperty("Sort task name")
+ private String sortTaskName;
+
+ @ApiModelProperty("Sort consumer group")
+ private String sortConsumerGroup;
+
@ApiModelProperty(value = "Whether to enable create sink resource? 0: disable, 1: enable. default is 1",
notes = "Such as create Hive table")
private Integer enableCreateResource = 1;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java
index b69ab7b45..860bc1b43 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java
@@ -47,8 +47,8 @@ public class SourceListResponse {
@ApiModelProperty("Data Serialization, support: csv, json, canal, avro, etc")
private String serializationType;
- @ApiModelProperty("Id of the source server")
- private Integer serverId;
+ @ApiModelProperty("Data node name")
+ private String dataNodeName;
@ApiModelProperty("Id of the cluster that collected this source")
private Integer clusterId;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index 8f4cd533a..177423797 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -56,8 +56,8 @@ public class SourceRequest {
@ApiModelProperty("Mac uuid of the agent running the task")
private String uuid;
- @ApiModelProperty("Id of the source server")
- private Integer serverId = 0;
+ @ApiModelProperty("Data node name")
+ private String dataNodeName;
@ApiModelProperty("Id of the cluster that collected this source")
private Integer clusterId;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index 75b177a6a..f0a678191 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -53,8 +53,8 @@ public class SourceResponse {
@ApiModelProperty("Mac uuid of the agent running the task")
private String uuid;
- @ApiModelProperty("Id of the source server")
- private Integer serverId;
+ @ApiModelProperty("Data node name")
+ private String dataNodeName;
@ApiModelProperty("Id of the cluster that collected this source")
private Integer clusterId;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
index 51d55163b..be6ec2c77 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
@@ -53,7 +53,7 @@ public class BinlogSourceDTO {
private int port;
@ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
- private int serverId;
+ private Integer serverId = 0;
@ApiModelProperty("Whether include schema, default is 'false'")
private String includeSchema;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index 037da1717..b5779fab4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -43,6 +43,9 @@ public class BinlogSourceListResponse extends SourceListResponse {
@ApiModelProperty("Exposed port of the DB server")
private int port;
+ @ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
+ private Integer serverId = 0;
+
@ApiModelProperty("Whether include schema, default is 'false'")
private String includeSchema;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index afc7e899e..a7f0d8ab7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -48,6 +48,9 @@ public class BinlogSourceRequest extends SourceRequest {
@ApiModelProperty("Exposed port of the DB server")
private int port = 3306;
+ @ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
+ private Integer serverId = 0;
+
@ApiModelProperty("Whether include schema, default is 'false'")
private String includeSchema;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index 69d51b492..ad1b87eaf 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -50,6 +50,9 @@ public class BinlogSourceResponse extends SourceResponse {
@ApiModelProperty("Exposed port of the DB server")
private int port;
+ @ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
+ private Integer serverId = 0;
+
@ApiModelProperty("Whether include schema, default is 'false'")
private String includeSchema;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
index bcea68d3c..cc0491cdd 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.common.pojo.stream;
import com.google.common.collect.Sets;
+import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -33,6 +34,9 @@ public class StreamNode {
protected List<StreamField> fields;
+ @ApiModelProperty("Data node name")
+ protected String dataNodeName;
+
public void addPre(String pre) {
Preconditions.checkNotEmpty(pre, "Pre node should not be empty");
if (preNodes == null) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSink.java
index fc91d4396..a11a737fb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSink.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSink.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.stream;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.enums.SinkType;
@@ -27,6 +28,7 @@ import java.util.List;
import java.util.Map;
@Data
+@EqualsAndHashCode(callSuper = true)
@ApiModel("Stream sink configuration")
public abstract class StreamSink extends StreamNode {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSource.java
index 3ac1dac24..7c111f2e5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSource.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSource.java
@@ -20,11 +20,13 @@ package org.apache.inlong.manager.common.pojo.stream;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.SourceType;
@Data
+@EqualsAndHashCode(callSuper = true)
@ApiModel("Stream source configuration")
public abstract class StreamSource extends StreamNode {
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkEntity.java
index cd49ad404..cc64ee62e 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkEntity.java
@@ -32,6 +32,10 @@ public class StreamSinkEntity implements Serializable {
private String sinkType;
private String sinkName;
private String description;
+ private String inlongClusterName;
+ private String dataNodeName;
+ private String sortTaskName;
+ private String sortConsumerGroup;
private Integer enableCreateResource;
private String operateLog;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index 6c68f0198..d51e62886 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -34,7 +34,7 @@ public class StreamSourceEntity implements Serializable {
private String agentIp;
private String uuid;
- private Integer serverId;
+ private String dataNodeName;
private Integer clusterId;
private String serializationType;
private String snapshot;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index fd1c08126..db1ce3f7d 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -29,6 +29,10 @@
<result column="sink_name" jdbcType="VARCHAR" property="sinkName"/>
<result column="description" jdbcType="INTEGER" property="description"/>
<result column="enable_create_resource" jdbcType="TINYINT" property="enableCreateResource"/>
+ <result column="inlong_cluster_name" jdbcType="VARCHAR" property="inlongClusterName"/>
+ <result column="data_node_name" jdbcType="VARCHAR" property="dataNodeName"/>
+ <result column="sort_task_name" jdbcType="VARCHAR" property="sortTaskName"/>
+ <result column="sort_consumer_group" jdbcType="VARCHAR" property="sortConsumerGroup"/>
<result column="ext_params" jdbcType="VARCHAR" property="extParams"/>
<result column="operate_log" jdbcType="VARCHAR" property="operateLog"/>
@@ -42,10 +46,10 @@
</resultMap>
<sql id="Base_Column_List">
- id, inlong_group_id, inlong_stream_id, sink_type, sink_name, description, enable_create_resource, ext_params,
- operate_log, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
+ id, inlong_group_id, inlong_stream_id, sink_type, sink_name, description, enable_create_resource,
+ inlong_cluster_name, data_node_name, sort_task_name, sort_consumer_group, ext_params, operate_log,
+ status, previous_status, is_deleted, creator, modifier, create_time, modify_time
</sql>
-
<select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
@@ -90,6 +94,18 @@
<if test="request.status != null and request.status != ''">
and status = #{request.status, jdbcType=INTEGER}
</if>
+ <if test="request.inlongClusterName != null and request.status != ''">
+ and inlong_cluster_name = #{request.inlongClusterName, jdbcType=VARCHAR}
+ </if>
+ <if test="request.dataNodeName != null and request.status != ''">
+ and data_node_name = #{request.dataNodeName, jdbcType=VARCHAR}
+ </if>
+ <if test="request.sortTaskName != null and request.status != ''">
+ and sort_task_name = #{request.sortTaskName, jdbcType=VARCHAR}
+ </if>
+ <if test="request.sortConsumerGroup != null and request.status != ''">
+ and sort_consumer_group = #{request.sortConsumerGroup, jdbcType=VARCHAR}
+ </if>
order by modify_time desc
</where>
</select>
@@ -200,13 +216,17 @@
parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
insert into stream_sink (id, inlong_group_id, inlong_stream_id,
sink_type, sink_name, description,
- enable_create_resource, ext_params,
+ enable_create_resource, inlong_cluster_name,
+ data_node_name, sort_task_name,
+ sort_consumer_group, ext_params,
operate_log, status, previous_status,
is_deleted, creator, modifier,
create_time, modify_time)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{sinkType,jdbcType=VARCHAR}, #{sinkName,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR},
- #{enableCreateResource,jdbcType=TINYINT}, #{extParams,jdbcType=VARCHAR},
+ #{enableCreateResource,jdbcType=TINYINT}, #{inlongClusterName,jdbcType=VARCHAR},
+ #{dataNodeName,jdbcType=VARCHAR}, #{sortTaskName,jdbcType=VARCHAR},
+ #{sortConsumerGroup,jdbcType=VARCHAR}, #{extParams,jdbcType=VARCHAR},
#{operateLog,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
#{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
#{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
@@ -236,6 +256,18 @@
<if test="enableCreateResource != null">
enable_create_resource,
</if>
+ <if test="inlongClusterName != null">
+ inlong_cluster_name,
+ </if>
+ <if test="dataNodeName != null">
+ data_node_name,
+ </if>
+ <if test="sortTaskName != null">
+ sort_task_name,
+ </if>
+ <if test="sortConsumerGroup != null">
+ sort_consumer_group,
+ </if>
<if test="extParams != null">
ext_params,
</if>
@@ -286,6 +318,18 @@
<if test="enableCreateResource != null">
#{enableCreateResource,jdbcType=TINYINT},
</if>
+ <if test="inlongClusterName != null">
+ #{inlongClusterName,jdbcType=VARCHAR},
+ </if>
+ <if test="dataNodeName != null">
+ #{dataNodeName,jdbcType=VARCHAR},
+ </if>
+ <if test="sortTaskName != null">
+ #{sortTaskName,jdbcType=VARCHAR},
+ </if>
+ <if test="sortConsumerGroup != null">
+ #{sortConsumerGroup,jdbcType=VARCHAR},
+ </if>
<if test="extParams != null">
#{extParams,jdbcType=VARCHAR},
</if>
@@ -338,6 +382,18 @@
<if test="enableCreateResource != null">
enable_create_resource = #{enableCreateResource,jdbcType=TINYINT},
</if>
+ <if test="inlongClusterName != null">
+ inlong_cluster_name = #{inlongClusterName,jdbcType=VARCHAR},
+ </if>
+ <if test="dataNodeName != null">
+ data_node_name = #{dataNodeName,jdbcType=VARCHAR},
+ </if>
+ <if test="sortTaskName != null">
+ sort_task_name = #{sortTaskName,jdbcType=VARCHAR},
+ </if>
+ <if test="sortConsumerGroup != null">
+ sort_consumer_group = #{sortConsumerGroup,jdbcType=VARCHAR},
+ </if>
<if test="extParams != null">
ext_params = #{extParams,jdbcType=VARCHAR},
</if>
@@ -376,6 +432,10 @@
sink_name = #{sinkName,jdbcType=VARCHAR},
description = #{description,jdbcType=VARCHAR},
enable_create_resource = #{enableCreateResource,jdbcType=TINYINT},
+ inlong_cluster_name = #{inlongClusterName,jdbcType=VARCHAR},
+ data_node_name = #{dataNodeName,jdbcType=VARCHAR},
+ sort_task_name = #{sortTaskName,jdbcType=VARCHAR},
+ sort_consumer_group = #{sortConsumerGroup,jdbcType=VARCHAR},
ext_params = #{extParams,jdbcType=VARCHAR},
operate_log = #{operateLog,jdbcType=VARCHAR},
status = #{status,jdbcType=INTEGER},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index c4787329b..31e3e65da 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -29,7 +29,7 @@
<result column="source_name" jdbcType="VARCHAR" property="sourceName"/>
<result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/>
<result column="uuid" jdbcType="VARCHAR" property="uuid"/>
- <result column="server_id" jdbcType="INTEGER" property="serverId"/>
+ <result column="data_node_name" jdbcType="VARCHAR" property="dataNodeName"/>
<result column="cluster_id" jdbcType="INTEGER" property="clusterId"/>
<result column="serialization_type" jdbcType="VARCHAR" property="serializationType"/>
<result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
@@ -46,7 +46,7 @@
</resultMap>
<sql id="Base_Column_List">
id, inlong_group_id, inlong_stream_id, source_type, source_name, agent_ip, uuid,
- server_id, cluster_id, serialization_type, snapshot, report_time, ext_params,
+ data_node_name, cluster_id, serialization_type, snapshot, report_time, ext_params,
version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
</sql>
@@ -54,14 +54,14 @@
parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
insert into stream_source (id, inlong_group_id, inlong_stream_id,
source_type, source_name, agent_ip,
- uuid, server_id, cluster_id,
+ uuid, data_node_name, cluster_id,
serialization_type, snapshot,
report_time, ext_params, version,
status, previous_status, is_deleted,
creator, modifier, create_time, modify_time)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{agentIp,jdbcType=VARCHAR},
- #{uuid,jdbcType=VARCHAR}, #{serverId,jdbcType=INTEGER}, #{clusterId,jdbcType=INTEGER},
+ #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR}, #{clusterId,jdbcType=INTEGER},
#{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
#{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{version,jdbcType=INTEGER},
#{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
@@ -238,8 +238,8 @@
<if test="uuid != null">
uuid = #{uuid,jdbcType=VARCHAR},
</if>
- <if test="serverId != null">
- server_id = #{serverId,jdbcType=INTEGER},
+ <if test="dataNodeName != null">
+ data_node_name = #{dataNodeName,jdbcType=VARCHAR},
</if>
<if test="clusterId != null">
cluster_id = #{clusterId,jdbcType=INTEGER},
@@ -291,7 +291,7 @@
source_name = #{sourceName,jdbcType=VARCHAR},
agent_ip = #{agentIp,jdbcType=VARCHAR},
uuid = #{uuid,jdbcType=VARCHAR},
- server_id = #{serverId,jdbcType=INTEGER},
+ data_node_name = #{dataNodeName,jdbcType=VARCHAR},
cluster_id = #{clusterId,jdbcType=INTEGER},
serialization_type = #{serializationType,jdbcType=VARCHAR},
snapshot = #{snapshot,jdbcType=LONGVARCHAR},
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 315378904..4ef5d7498 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -528,7 +528,7 @@ CREATE TABLE `stream_source`
`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
- `server_id` int(11) DEFAULT NULL COMMENT 'Id of the source server',
+ `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
`cluster_id` int(11) DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
`serialization_type` varchar(20) DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
`snapshot` text DEFAULT NULL COMMENT 'Snapshot of this source task',
@@ -585,6 +585,10 @@ CREATE TABLE `stream_sink`
`sink_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Sink name',
`description` varchar(500) NULL COMMENT 'Sink description',
`enable_create_resource` tinyint(2) DEFAULT '1' COMMENT 'Whether to enable create sink resource? 0: disable, 1: enable. default is 1',
+ `inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name, which links to inlong_cluster table',
+ `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
+ `sort_task_name` varchar(512) DEFAULT NULL COMMENT 'Sort task name or task ID',
+ `sort_consumer_group` varchar(512) DEFAULT NULL COMMENT 'Consumer group name for Sort task',
`ext_params` text COMMENT 'Another fields, will saved as JSON type',
`operate_log` varchar(5000) DEFAULT NULL COMMENT 'Background operate log',
`status` int(11) DEFAULT '0' COMMENT 'Status',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index c3895f763..7a8e7bde0 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -555,7 +555,7 @@ CREATE TABLE `stream_source`
`source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task',
`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
- `server_id` int(11) DEFAULT NULL COMMENT 'Id of the source server',
+ `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
`cluster_id` int(11) DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
`serialization_type` varchar(20) DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
`snapshot` text DEFAULT NULL COMMENT 'Snapshot of this source task',
@@ -614,6 +614,10 @@ CREATE TABLE `stream_sink`
`sink_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'Sink name',
`description` varchar(500) NULL COMMENT 'Sink description',
`enable_create_resource` tinyint(1) DEFAULT '1' COMMENT 'Whether to enable create sink resource? 0: disable, 1: enable. default is 1',
+ `inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name, which links to inlong_cluster table',
+ `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table',
+ `sort_task_name` varchar(512) DEFAULT NULL COMMENT 'Sort task name or task ID',
+ `sort_consumer_group` varchar(512) DEFAULT NULL COMMENT 'Consumer group name for Sort task',
`ext_params` text NULL COMMENT 'Another fields, will saved as JSON type',
`operate_log` text DEFAULT NULL COMMENT 'Background operate log',
`status` int(11) DEFAULT '0' COMMENT 'Status',