You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/07 14:54:09 UTC

[incubator-inlong] branch master updated: [INLONG-4084][Manager] Add some fields for stream_source and stream_sink (#4093)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d609d8a0 [INLONG-4084][Manager] Add some fields for stream_source and stream_sink (#4093)
3d609d8a0 is described below

commit 3d609d8a09c58c4630c18c6b8e0c26712605a181
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Sat May 7 22:54:04 2022 +0800

    [INLONG-4084][Manager] Add some fields for stream_source and stream_sink (#4093)
    
    * [INLONG-4084][Manager] Add some fields for stream_source and stream_sink
    
    * [INLONG-4084][Manager] Delete dataNodeName for BinlogSourceDTO
    
    * [INLONG-4084][Manager] Fix jdbcType errors in StreamSinkMapper.xml
---
 .../client/api/source/MySQLBinlogSource.java       |  2 +-
 .../api/util/InlongStreamSourceTransfer.java       |  2 +
 .../manager/common/pojo/sink/SinkListResponse.java | 14 ++++-
 .../manager/common/pojo/sink/SinkPageRequest.java  | 12 ++++
 .../manager/common/pojo/sink/SinkRequest.java      | 12 ++++
 .../manager/common/pojo/sink/SinkResponse.java     | 12 ++++
 .../common/pojo/source/SourceListResponse.java     |  4 +-
 .../manager/common/pojo/source/SourceRequest.java  |  4 +-
 .../manager/common/pojo/source/SourceResponse.java |  4 +-
 .../common/pojo/source/binlog/BinlogSourceDTO.java |  2 +-
 .../source/binlog/BinlogSourceListResponse.java    |  3 +
 .../pojo/source/binlog/BinlogSourceRequest.java    |  3 +
 .../pojo/source/binlog/BinlogSourceResponse.java   |  3 +
 .../manager/common/pojo/stream/StreamNode.java     |  4 ++
 .../manager/common/pojo/stream/StreamSink.java     |  2 +
 .../manager/common/pojo/stream/StreamSource.java   |  2 +
 .../manager/dao/entity/StreamSinkEntity.java       |  4 ++
 .../manager/dao/entity/StreamSourceEntity.java     |  2 +-
 .../resources/mappers/StreamSinkEntityMapper.xml   | 70 ++++++++++++++++++++--
 .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++---
 .../main/resources/sql/apache_inlong_manager.sql   |  6 +-
 .../manager-web/sql/apache_inlong_manager.sql      |  6 +-
 22 files changed, 163 insertions(+), 24 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index 46653155c..0e06580ac 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -59,7 +59,7 @@ public class MySQLBinlogSource extends StreamSource {
     private int port = 3306;
 
     @ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
-    private int serverId = 0;
+    private Integer serverId = 0;
 
     @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, "
             + "separate them with commas, for example: db1,test_db*",
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index a2b5096d1..6faca5297 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -115,6 +115,7 @@ public class InlongStreamSourceTransfer {
         binlogSource.setAgentIp(response.getAgentIp());
         binlogSource.setState(State.parseByStatus(response.getStatus()));
         binlogSource.setServerId(response.getServerId());
+        binlogSource.setDataNodeName(response.getDataNodeName());
         DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
                 response.getUser(),
                 response.getPassword());
@@ -193,6 +194,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setHostname(binlogSource.getHostname());
         sourceRequest.setPort(binlogSource.getPort());
         sourceRequest.setServerId(binlogSource.getServerId());
+        sourceRequest.setDataNodeName(binlogSource.getDataNodeName());
         sourceRequest.setIncludeSchema(binlogSource.getIncludeSchema());
         sourceRequest.setServerTimezone(binlogSource.getServerTimezone());
         sourceRequest.setMonitoredDdl(binlogSource.getMonitoredDdl());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
index b8212a99b..0fe441e9c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
@@ -19,10 +19,10 @@ package org.apache.inlong.manager.common.pojo.sink;
 
 import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Map;
 import lombok.Data;
 
 import java.util.Date;
+import java.util.Map;
 
 /**
  * Response of the sink list
@@ -51,6 +51,18 @@ public class SinkListResponse {
     @ApiModelProperty("Sink description")
     private String description;
 
+    @ApiModelProperty("Inlong cluster name")
+    private String inlongClusterName;
+
+    @ApiModelProperty("Data node name")
+    private String dataNodeName;
+
+    @ApiModelProperty("Sort task name")
+    private String sortTaskName;
+
+    @ApiModelProperty("Sort consumer group")
+    private String sortConsumerGroup;
+
     @ApiModelProperty(value = "Whether to enable create sink resource? 0: disable, 1: enable. default is 1")
     private Integer enableCreateResource;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
index ac368e45b..55990a534 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
@@ -49,4 +49,16 @@ public class SinkPageRequest extends PageRequest {
     @ApiModelProperty(value = "Status")
     private Integer status;
 
+    @ApiModelProperty("Inlong cluster name")
+    private String inlongClusterName;
+
+    @ApiModelProperty("Data node name")
+    private String dataNodeName;
+
+    @ApiModelProperty("Sort task name")
+    private String sortTaskName;
+
+    @ApiModelProperty("Sort consumer group")
+    private String sortConsumerGroup;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
index 52793f588..6c5d44358 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
@@ -57,6 +57,18 @@ public class SinkRequest {
     @ApiModelProperty("Sink description")
     private String description;
 
+    @ApiModelProperty("Inlong cluster name")
+    private String inlongClusterName;
+
+    @ApiModelProperty("Data node name")
+    private String dataNodeName;
+
+    @ApiModelProperty("Sort task name")
+    private String sortTaskName;
+
+    @ApiModelProperty("Sort consumer group")
+    private String sortConsumerGroup;
+
     @ApiModelProperty(value = "Whether to enable create sink resource? 0: disable, 1: enable. default is 1")
     private Integer enableCreateResource = 1;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
index 6b25928fc..30f249237 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
@@ -53,6 +53,18 @@ public class SinkResponse {
     @ApiModelProperty("Sink description")
     private String description;
 
+    @ApiModelProperty("Inlong cluster name")
+    private String inlongClusterName;
+
+    @ApiModelProperty("Data node name")
+    private String dataNodeName;
+
+    @ApiModelProperty("Sort task name")
+    private String sortTaskName;
+
+    @ApiModelProperty("Sort consumer group")
+    private String sortConsumerGroup;
+
     @ApiModelProperty(value = "Whether to enable create sink resource? 0: disable, 1: enable. default is 1",
             notes = "Such as create Hive table")
     private Integer enableCreateResource = 1;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java
index b69ab7b45..860bc1b43 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java
@@ -47,8 +47,8 @@ public class SourceListResponse {
     @ApiModelProperty("Data Serialization, support: csv, json, canal, avro, etc")
     private String serializationType;
 
-    @ApiModelProperty("Id of the source server")
-    private Integer serverId;
+    @ApiModelProperty("Data node name")
+    private String dataNodeName;
 
     @ApiModelProperty("Id of the cluster that collected this source")
     private Integer clusterId;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index 8f4cd533a..177423797 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -56,8 +56,8 @@ public class SourceRequest {
     @ApiModelProperty("Mac uuid of the agent running the task")
     private String uuid;
 
-    @ApiModelProperty("Id of the source server")
-    private Integer serverId = 0;
+    @ApiModelProperty("Data node name")
+    private String dataNodeName;
 
     @ApiModelProperty("Id of the cluster that collected this source")
     private Integer clusterId;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index 75b177a6a..f0a678191 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -53,8 +53,8 @@ public class SourceResponse {
     @ApiModelProperty("Mac uuid of the agent running the task")
     private String uuid;
 
-    @ApiModelProperty("Id of the source server")
-    private Integer serverId;
+    @ApiModelProperty("Data node name")
+    private String dataNodeName;
 
     @ApiModelProperty("Id of the cluster that collected this source")
     private Integer clusterId;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
index 51d55163b..be6ec2c77 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
@@ -53,7 +53,7 @@ public class BinlogSourceDTO {
     private int port;
 
     @ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
-    private int serverId;
+    private Integer serverId = 0;
 
     @ApiModelProperty("Whether include schema, default is 'false'")
     private String includeSchema;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index 037da1717..b5779fab4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -43,6 +43,9 @@ public class BinlogSourceListResponse extends SourceListResponse {
     @ApiModelProperty("Exposed port of the DB server")
     private int port;
 
+    @ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
+    private Integer serverId = 0;
+
     @ApiModelProperty("Whether include schema, default is 'false'")
     private String includeSchema;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index afc7e899e..a7f0d8ab7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -48,6 +48,9 @@ public class BinlogSourceRequest extends SourceRequest {
     @ApiModelProperty("Exposed port of the DB server")
     private int port = 3306;
 
+    @ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
+    private Integer serverId = 0;
+
     @ApiModelProperty("Whether include schema, default is 'false'")
     private String includeSchema;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index 69d51b492..ad1b87eaf 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -50,6 +50,9 @@ public class BinlogSourceResponse extends SourceResponse {
     @ApiModelProperty("Exposed port of the DB server")
     private int port;
 
+    @ApiModelProperty("Id of physical node of MySQL Cluster, 0 if single node")
+    private Integer serverId = 0;
+
     @ApiModelProperty("Whether include schema, default is 'false'")
     private String includeSchema;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
index bcea68d3c..cc0491cdd 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.common.pojo.stream;
 
 import com.google.common.collect.Sets;
+import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import org.apache.inlong.manager.common.util.Preconditions;
 
@@ -33,6 +34,9 @@ public class StreamNode {
 
     protected List<StreamField> fields;
 
+    @ApiModelProperty("Data node name")
+    protected String dataNodeName;
+
     public void addPre(String pre) {
         Preconditions.checkNotEmpty(pre, "Pre node should not be empty");
         if (preNodes == null) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSink.java
index fc91d4396..a11a737fb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSink.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSink.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.stream;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.enums.SinkType;
 
@@ -27,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 
 @Data
+@EqualsAndHashCode(callSuper = true)
 @ApiModel("Stream sink configuration")
 public abstract class StreamSink extends StreamNode {
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSource.java
index 3ac1dac24..7c111f2e5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSource.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamSource.java
@@ -20,11 +20,13 @@ package org.apache.inlong.manager.common.pojo.stream;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.SourceType;
 
 @Data
+@EqualsAndHashCode(callSuper = true)
 @ApiModel("Stream source configuration")
 public abstract class StreamSource extends StreamNode {
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkEntity.java
index cd49ad404..cc64ee62e 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSinkEntity.java
@@ -32,6 +32,10 @@ public class StreamSinkEntity implements Serializable {
     private String sinkType;
     private String sinkName;
     private String description;
+    private String inlongClusterName;
+    private String dataNodeName;
+    private String sortTaskName;
+    private String sortConsumerGroup;
     private Integer enableCreateResource;
 
     private String operateLog;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index 6c68f0198..d51e62886 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -34,7 +34,7 @@ public class StreamSourceEntity implements Serializable {
     private String agentIp;
     private String uuid;
 
-    private Integer serverId;
+    private String dataNodeName;
     private Integer clusterId;
     private String serializationType;
     private String snapshot;
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index fd1c08126..db1ce3f7d 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -29,6 +29,10 @@
         <result column="sink_name" jdbcType="VARCHAR" property="sinkName"/>
         <result column="description" jdbcType="INTEGER" property="description"/>
         <result column="enable_create_resource" jdbcType="TINYINT" property="enableCreateResource"/>
+        <result column="inlong_cluster_name" jdbcType="VARCHAR" property="inlongClusterName"/>
+        <result column="data_node_name" jdbcType="VARCHAR" property="dataNodeName"/>
+        <result column="sort_task_name" jdbcType="VARCHAR" property="sortTaskName"/>
+        <result column="sort_consumer_group" jdbcType="VARCHAR" property="sortConsumerGroup"/>
         <result column="ext_params" jdbcType="VARCHAR" property="extParams"/>
         <result column="operate_log" jdbcType="VARCHAR" property="operateLog"/>
 
@@ -42,10 +46,10 @@
     </resultMap>
 
     <sql id="Base_Column_List">
-        id, inlong_group_id, inlong_stream_id, sink_type, sink_name, description, enable_create_resource, ext_params,
-        operate_log, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
+        id, inlong_group_id, inlong_stream_id, sink_type, sink_name, description, enable_create_resource,
+        inlong_cluster_name, data_node_name, sort_task_name, sort_consumer_group, ext_params, operate_log,
+        status, previous_status, is_deleted, creator, modifier, create_time, modify_time
     </sql>
-
     <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
         select
         <include refid="Base_Column_List"/>
@@ -90,6 +94,18 @@
             <if test="request.status != null and request.status != ''">
                 and status = #{request.status, jdbcType=INTEGER}
             </if>
+            <if test="request.inlongClusterName != null and request.status != ''">
+                and inlong_cluster_name = #{request.inlongClusterName, jdbcType=VARCHAR}
+            </if>
+            <if test="request.dataNodeName != null and request.status != ''">
+                and data_node_name = #{request.dataNodeName, jdbcType=VARCHAR}
+            </if>
+            <if test="request.sortTaskName != null and request.status != ''">
+                and sort_task_name = #{request.sortTaskName, jdbcType=VARCHAR}
+            </if>
+            <if test="request.sortConsumerGroup != null and request.status != ''">
+                and sort_consumer_group = #{request.sortConsumerGroup, jdbcType=VARCHAR}
+            </if>
             order by modify_time desc
         </where>
     </select>
@@ -200,13 +216,17 @@
             parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
         insert into stream_sink (id, inlong_group_id, inlong_stream_id,
                                  sink_type, sink_name, description,
-                                 enable_create_resource, ext_params,
+                                 enable_create_resource, inlong_cluster_name,
+                                 data_node_name, sort_task_name,
+                                 sort_consumer_group, ext_params,
                                  operate_log, status, previous_status,
                                  is_deleted, creator, modifier,
                                  create_time, modify_time)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sinkType,jdbcType=VARCHAR}, #{sinkName,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR},
-                #{enableCreateResource,jdbcType=TINYINT}, #{extParams,jdbcType=VARCHAR},
+                #{enableCreateResource,jdbcType=TINYINT}, #{inlongClusterName,jdbcType=VARCHAR},
+                #{dataNodeName,jdbcType=VARCHAR}, #{sortTaskName,jdbcType=VARCHAR},
+                #{sortConsumerGroup,jdbcType=VARCHAR}, #{extParams,jdbcType=VARCHAR},
                 #{operateLog,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
                 #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
                 #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
@@ -236,6 +256,18 @@
             <if test="enableCreateResource != null">
                 enable_create_resource,
             </if>
+            <if test="inlongClusterName != null">
+                inlong_cluster_name,
+            </if>
+            <if test="dataNodeName != null">
+                data_node_name,
+            </if>
+            <if test="sortTaskName != null">
+                sort_task_name,
+            </if>
+            <if test="sortConsumerGroup != null">
+                sort_consumer_group,
+            </if>
             <if test="extParams != null">
                 ext_params,
             </if>
@@ -286,6 +318,18 @@
             <if test="enableCreateResource != null">
                 #{enableCreateResource,jdbcType=TINYINT},
             </if>
+            <if test="inlongClusterName != null">
+                #{inlongClusterName,jdbcType=VARCHAR},
+            </if>
+            <if test="dataNodeName != null">
+                #{dataNodeName,jdbcType=VARCHAR},
+            </if>
+            <if test="sortTaskName != null">
+                #{sortTaskName,jdbcType=VARCHAR},
+            </if>
+            <if test="sortConsumerGroup != null">
+                #{sortConsumerGroup,jdbcType=VARCHAR},
+            </if>
             <if test="extParams != null">
                 #{extParams,jdbcType=VARCHAR},
             </if>
@@ -338,6 +382,18 @@
             <if test="enableCreateResource != null">
                 enable_create_resource = #{enableCreateResource,jdbcType=TINYINT},
             </if>
+            <if test="inlongClusterName != null">
+                inlong_cluster_name = #{inlongClusterName,jdbcType=VARCHAR},
+            </if>
+            <if test="dataNodeName != null">
+                data_node_name = #{dataNodeName,jdbcType=VARCHAR},
+            </if>
+            <if test="sortTaskName != null">
+                sort_task_name = #{sortTaskName,jdbcType=VARCHAR},
+            </if>
+            <if test="sortConsumerGroup != null">
+                sort_consumer_group = #{sortConsumerGroup,jdbcType=VARCHAR},
+            </if>
             <if test="extParams != null">
                 ext_params = #{extParams,jdbcType=VARCHAR},
             </if>
@@ -376,6 +432,10 @@
             sink_name              = #{sinkName,jdbcType=VARCHAR},
             description            = #{description,jdbcType=VARCHAR},
             enable_create_resource = #{enableCreateResource,jdbcType=TINYINT},
+            inlong_cluster_name    = #{inlongClusterName,jdbcType=VARCHAR},
+            data_node_name         = #{dataNodeName,jdbcType=VARCHAR},
+            sort_task_name         = #{sortTaskName,jdbcType=VARCHAR},
+            sort_consumer_group    = #{sortConsumerGroup,jdbcType=VARCHAR},
             ext_params             = #{extParams,jdbcType=VARCHAR},
             operate_log            = #{operateLog,jdbcType=VARCHAR},
             status                 = #{status,jdbcType=INTEGER},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index c4787329b..31e3e65da 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -29,7 +29,7 @@
         <result column="source_name" jdbcType="VARCHAR" property="sourceName"/>
         <result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/>
         <result column="uuid" jdbcType="VARCHAR" property="uuid"/>
-        <result column="server_id" jdbcType="INTEGER" property="serverId"/>
+        <result column="data_node_name" jdbcType="VARCHAR" property="dataNodeName"/>
         <result column="cluster_id" jdbcType="INTEGER" property="clusterId"/>
         <result column="serialization_type" jdbcType="VARCHAR" property="serializationType"/>
         <result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
@@ -46,7 +46,7 @@
     </resultMap>
     <sql id="Base_Column_List">
         id, inlong_group_id, inlong_stream_id, source_type, source_name, agent_ip, uuid,
-        server_id, cluster_id, serialization_type, snapshot, report_time, ext_params,
+        data_node_name, cluster_id, serialization_type, snapshot, report_time, ext_params,
         version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
     </sql>
 
@@ -54,14 +54,14 @@
             parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         insert into stream_source (id, inlong_group_id, inlong_stream_id,
                                    source_type, source_name, agent_ip,
-                                   uuid, server_id, cluster_id,
+                                   uuid, data_node_name, cluster_id,
                                    serialization_type, snapshot,
                                    report_time, ext_params, version,
                                    status, previous_status, is_deleted,
                                    creator, modifier, create_time, modify_time)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{agentIp,jdbcType=VARCHAR},
-                #{uuid,jdbcType=VARCHAR}, #{serverId,jdbcType=INTEGER}, #{clusterId,jdbcType=INTEGER},
+                #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR}, #{clusterId,jdbcType=INTEGER},
                 #{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
                 #{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{version,jdbcType=INTEGER},
                 #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
@@ -238,8 +238,8 @@
             <if test="uuid != null">
                 uuid = #{uuid,jdbcType=VARCHAR},
             </if>
-            <if test="serverId != null">
-                server_id = #{serverId,jdbcType=INTEGER},
+            <if test="dataNodeName != null">
+                data_node_name = #{dataNodeName,jdbcType=VARCHAR},
             </if>
             <if test="clusterId != null">
                 cluster_id = #{clusterId,jdbcType=INTEGER},
@@ -291,7 +291,7 @@
             source_name        = #{sourceName,jdbcType=VARCHAR},
             agent_ip           = #{agentIp,jdbcType=VARCHAR},
             uuid               = #{uuid,jdbcType=VARCHAR},
-            server_id          = #{serverId,jdbcType=INTEGER},
+            data_node_name     = #{dataNodeName,jdbcType=VARCHAR},
             cluster_id         = #{clusterId,jdbcType=INTEGER},
             serialization_type = #{serializationType,jdbcType=VARCHAR},
             snapshot           = #{snapshot,jdbcType=LONGVARCHAR},
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 315378904..4ef5d7498 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -528,7 +528,7 @@ CREATE TABLE `stream_source`
     `source_name`        varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
     `agent_ip`           varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task',
     `uuid`               varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
-    `server_id`          int(11)               DEFAULT NULL COMMENT 'Id of the source server',
+    `data_node_name`     varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
     `cluster_id`         int(11)               DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
     `serialization_type` varchar(20)           DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
     `snapshot`           text                  DEFAULT NULL COMMENT 'Snapshot of this source task',
@@ -585,6 +585,10 @@ CREATE TABLE `stream_sink`
     `sink_name`              varchar(128) NOT NULL DEFAULT '' COMMENT 'Sink name',
     `description`            varchar(500) NULL COMMENT 'Sink description',
     `enable_create_resource` tinyint(2)            DEFAULT '1' COMMENT 'Whether to enable create sink resource? 0: disable, 1: enable. default is 1',
+    `inlong_cluster_name`    varchar(128)          DEFAULT NULL COMMENT 'Cluster name, which links to inlong_cluster table',
+    `data_node_name`         varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
+    `sort_task_name`         varchar(512)          DEFAULT NULL COMMENT 'Sort task name or task ID',
+    `sort_consumer_group`    varchar(512)          DEFAULT NULL COMMENT 'Consumer group name for Sort task',
     `ext_params`             text COMMENT 'Another fields, will saved as JSON type',
     `operate_log`            varchar(5000)         DEFAULT NULL COMMENT 'Background operate log',
     `status`                 int(11)               DEFAULT '0' COMMENT 'Status',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index c3895f763..7a8e7bde0 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -555,7 +555,7 @@ CREATE TABLE `stream_source`
     `source_type`        varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
     `agent_ip`           varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task',
     `uuid`               varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
-    `server_id`          int(11)               DEFAULT NULL COMMENT 'Id of the source server',
+    `data_node_name`     varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
     `cluster_id`         int(11)               DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
     `serialization_type` varchar(20)           DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
     `snapshot`           text                  DEFAULT NULL COMMENT 'Snapshot of this source task',
@@ -614,6 +614,10 @@ CREATE TABLE `stream_sink`
     `sink_name`              varchar(128) NOT NULL DEFAULT '' COMMENT 'Sink name',
     `description`            varchar(500) NULL COMMENT 'Sink description',
     `enable_create_resource` tinyint(1)            DEFAULT '1' COMMENT 'Whether to enable create sink resource? 0: disable, 1: enable. default is 1',
+    `inlong_cluster_name`    varchar(128)          DEFAULT NULL COMMENT 'Cluster name, which links to inlong_cluster table',
+    `data_node_name`         varchar(128)          DEFAULT NULL COMMENT 'Node name, which links to data_node table',
+    `sort_task_name`         varchar(512)          DEFAULT NULL COMMENT 'Sort task name or task ID',
+    `sort_consumer_group`    varchar(512)          DEFAULT NULL COMMENT 'Consumer group name for Sort task',
     `ext_params`             text         NULL COMMENT 'Another fields, will saved as JSON type',
     `operate_log`            text                  DEFAULT NULL COMMENT 'Background operate log',
     `status`                 int(11)               DEFAULT '0' COMMENT 'Status',