You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/11 09:01:05 UTC

[incubator-inlong] branch master updated: [INLONG-3599][Manager] Fix null pointer exception occurred when listing file sources (#3600)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 90ff9dd0a [INLONG-3599][Manager] Fix null pointer exception occurred when listing file sources (#3600)
90ff9dd0a is described below

commit 90ff9dd0a897ac7fbf04fcccba1b616a70f55fc5
Author: healchow <he...@gmail.com>
AuthorDate: Mon Apr 11 17:01:01 2022 +0800

    [INLONG-3599][Manager] Fix null pointer exception occurred when listing file sources (#3600)
---
 .../manager/client/api/source/AgentFileSource.java |  6 +--
 .../common/pojo/source/SourceListResponse.java     |  9 ++--
 .../manager/common/pojo/source/SourceRequest.java  | 10 +---
 .../manager/common/pojo/source/SourceResponse.java |  9 +---
 .../common/pojo/source/file/FileSourceDTO.java     |  6 +--
 .../pojo/source/file/FileSourceListResponse.java   |  6 +--
 .../common/pojo/source/file/FileSourceRequest.java |  6 +--
 .../pojo/source/file/FileSourceResponse.java       |  6 ++-
 .../manager/dao/entity/StreamSourceEntity.java     |  3 +-
 .../resources/mappers/StreamSourceEntityMapper.xml | 61 ++++++++++------------
 .../service/source/StreamSourceServiceImpl.java    | 14 ++---
 .../service/source/file/FileSourceOperation.java   | 13 +++++
 .../main/resources/sql/apache_inlong_manager.sql   | 43 ++++++++-------
 .../manager-web/sql/apache_inlong_manager.sql      | 43 ++++++++-------
 14 files changed, 114 insertions(+), 121 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
index 92953ab93..4a6ccac1b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
@@ -51,9 +51,9 @@ public class AgentFileSource extends StreamSource {
 
     @ApiModelProperty("TimeOffset for collection, "
             + "'1m' means from one minute after, '-1m' means from one minute before, "
-            + "'1h' means from one hour after, '-1h' means from one minute before"
-            + "'1d' means from one day after, '-1d' means from one minute before"
-            + "Null means from current timestamp")
+            + "'1h' means from one hour after, '-1h' means from one minute before, "
+            + "'1d' means from one day after, '-1d' means from one minute before, "
+            + "Null or blank means from current timestamp")
     private String timeOffset;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java
index cbb7ea131..b69ab7b45 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceListResponse.java
@@ -44,18 +44,15 @@ public class SourceListResponse {
     @ApiModelProperty("Source name, unique in one stream.")
     private String sourceName;
 
+    @ApiModelProperty("Data Serialization, support: csv, json, canal, avro, etc")
+    private String serializationType;
+
     @ApiModelProperty("Id of the source server")
     private Integer serverId;
 
-    @ApiModelProperty("Name of the source server")
-    private String serverName;
-
     @ApiModelProperty("Id of the cluster that collected this source")
     private Integer clusterId;
 
-    @ApiModelProperty("Name of the cluster that collected this source")
-    private String clusterName;
-
     @ApiModelProperty(value = "Status")
     private Integer status;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index ef062faa1..1cc9bf76f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -57,21 +57,15 @@ public class SourceRequest {
     @ApiModelProperty("Id of the source server")
     private Integer serverId;
 
-    @ApiModelProperty("Name of the source server")
-    private String serverName;
-
     @ApiModelProperty("Id of the cluster that collected this source")
     private Integer clusterId;
 
-    @ApiModelProperty("Name of the cluster that collected this source")
-    private String clusterName;
+    @ApiModelProperty("Serialization type, support: csv, json, canal, avro, etc")
+    private String serializationType;
 
     @ApiModelProperty("Snapshot of the source task")
     private String snapshot;
 
-    @ApiModelProperty("Data Serialization, support: csv, json, canal, avro, etc")
-    private String serializationType;
-
     @ApiModelProperty("Version")
     private Integer version;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index 08bb3a88a..9c09ae51b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -54,14 +54,11 @@ public class SourceResponse {
     @ApiModelProperty("Id of the source server")
     private Integer serverId;
 
-    @ApiModelProperty("Name of the source server")
-    private String serverName;
-
     @ApiModelProperty("Id of the cluster that collected this source")
     private Integer clusterId;
 
-    @ApiModelProperty("Name of the cluster that collected this source")
-    private String clusterName;
+    @ApiModelProperty("Data Serialization, support: csv, json, canal, avro, etc")
+    private String serializationType;
 
     @ApiModelProperty("Snapshot of this source task")
     private String snapshot;
@@ -87,6 +84,4 @@ public class SourceResponse {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 
-    @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
-    private String serializationType;
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
index 7d5f2338a..3c9b6737f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
@@ -48,9 +48,9 @@ public class FileSourceDTO {
 
     @ApiModelProperty("TimeOffset for collection, "
             + "'1m' means from one minute after, '-1m' means from one minute before, "
-            + "'1h' means from one hour after, '-1h' means from one minute before"
-            + "'1d' means from one day after, '-1d' means from one minute before"
-            + "Null means from current timestamp")
+            + "'1h' means from one hour after, '-1h' means from one minute before, "
+            + "'1d' means from one day after, '-1d' means from one minute before, "
+            + "Null or blank means from current timestamp")
     private String timeOffset;
 
     public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest fileSourceRequest) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceListResponse.java
index 84871259e..33ebff47b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceListResponse.java
@@ -40,9 +40,9 @@ public class FileSourceListResponse extends SourceListResponse {
 
     @ApiModelProperty("TimeOffset for collection, "
             + "'1m' means from one minute after, '-1m' means from one minute before, "
-            + "'1h' means from one hour after, '-1h' means from one minute before"
-            + "'1d' means from one day after, '-1d' means from one minute before"
-            + "Null means from current timestamp")
+            + "'1h' means from one hour after, '-1h' means from one minute before, "
+            + "'1d' means from one day after, '-1d' means from one minute before, "
+            + "Null or blank means from current timestamp")
     private String timeOffset;
 
     public FileSourceListResponse() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceRequest.java
index d0c8dfc42..69569f22a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceRequest.java
@@ -44,9 +44,9 @@ public class FileSourceRequest extends SourceRequest {
 
     @ApiModelProperty("TimeOffset for collection, "
             + "'1m' means from one minute after, '-1m' means from one minute before, "
-            + "'1h' means from one hour after, '-1h' means from one minute before"
-            + "'1d' means from one day after, '-1d' means from one minute before"
-            + "Null means from current timestamp")
+            + "'1h' means from one hour after, '-1h' means from one minute before, "
+            + "'1d' means from one day after, '-1d' means from one minute before, "
+            + "Null or blank means from current timestamp")
     private String timeOffset;
 
     public FileSourceRequest() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceResponse.java
index d8c0e85a2..44edfd3ff 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceResponse.java
@@ -41,8 +41,10 @@ public class FileSourceResponse extends SourceResponse {
     private String pattern;
 
     @ApiModelProperty("TimeOffset for collection, "
-            + "'1m' means one minute before, '1h' means one hour before, '1d' means one day before, "
-            + "Null means from current timestamp")
+            + "'1m' means from one minute after, '-1m' means from one minute before, "
+            + "'1h' means from one hour after, '-1h' means from one minute before, "
+            + "'1d' means from one day after, '-1d' means from one minute before, "
+            + "Null or blank means from current timestamp")
     private String timeOffset;
 
     public FileSourceResponse() {
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index 34d3ab953..6c68f0198 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -35,9 +35,8 @@ public class StreamSourceEntity implements Serializable {
     private String uuid;
 
     private Integer serverId;
-    private String serverName;
     private Integer clusterId;
-    private String clusterName;
+    private String serializationType;
     private String snapshot;
     private Date reportTime;
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 666bb6029..c4787329b 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -30,9 +30,8 @@
         <result column="agent_ip" jdbcType="VARCHAR" property="agentIp"/>
         <result column="uuid" jdbcType="VARCHAR" property="uuid"/>
         <result column="server_id" jdbcType="INTEGER" property="serverId"/>
-        <result column="server_name" jdbcType="VARCHAR" property="serverName"/>
         <result column="cluster_id" jdbcType="INTEGER" property="clusterId"/>
-        <result column="cluster_name" jdbcType="VARCHAR" property="clusterName"/>
+        <result column="serialization_type" jdbcType="VARCHAR" property="serializationType"/>
         <result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
         <result column="report_time" jdbcType="TIMESTAMP" property="reportTime"/>
         <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
@@ -47,7 +46,7 @@
     </resultMap>
     <sql id="Base_Column_List">
         id, inlong_group_id, inlong_stream_id, source_type, source_name, agent_ip, uuid,
-        server_id, server_name, cluster_id, cluster_name, snapshot, report_time, ext_params,
+        server_id, cluster_id, serialization_type, snapshot, report_time, ext_params,
         version, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
     </sql>
 
@@ -55,15 +54,15 @@
             parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         insert into stream_source (id, inlong_group_id, inlong_stream_id,
                                    source_type, source_name, agent_ip,
-                                   uuid, server_id, server_name,
-                                   cluster_id, cluster_name, snapshot,
+                                   uuid, server_id, cluster_id,
+                                   serialization_type, snapshot,
                                    report_time, ext_params, version,
                                    status, previous_status, is_deleted,
                                    creator, modifier, create_time, modify_time)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{agentIp,jdbcType=VARCHAR},
-                #{uuid,jdbcType=VARCHAR}, #{serverId,jdbcType=INTEGER}, #{serverName,jdbcType=VARCHAR},
-                #{clusterId,jdbcType=INTEGER}, #{clusterName,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
+                #{uuid,jdbcType=VARCHAR}, #{serverId,jdbcType=INTEGER}, #{clusterId,jdbcType=INTEGER},
+                #{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR},
                 #{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{version,jdbcType=INTEGER},
                 #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
                 #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
@@ -242,14 +241,11 @@
             <if test="serverId != null">
                 server_id = #{serverId,jdbcType=INTEGER},
             </if>
-            <if test="serverName != null">
-                server_name = #{serverName,jdbcType=VARCHAR},
-            </if>
             <if test="clusterId != null">
                 cluster_id = #{clusterId,jdbcType=INTEGER},
             </if>
-            <if test="clusterName != null">
-                cluster_name = #{clusterName,jdbcType=VARCHAR},
+            <if test="serializationType != null">
+                serialization_type = #{serializationType,jdbcType=VARCHAR},
             </if>
             <if test="snapshot != null">
                 snapshot = #{snapshot,jdbcType=LONGVARCHAR},
@@ -289,27 +285,26 @@
     </update>
     <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         update stream_source
-        set inlong_group_id  = #{inlongGroupId,jdbcType=VARCHAR},
-            inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-            source_type      = #{sourceType,jdbcType=VARCHAR},
-            source_name      = #{sourceName,jdbcType=VARCHAR},
-            agent_ip         = #{agentIp,jdbcType=VARCHAR},
-            uuid             = #{uuid,jdbcType=VARCHAR},
-            server_id        = #{serverId,jdbcType=INTEGER},
-            server_name      = #{serverName,jdbcType=VARCHAR},
-            cluster_id       = #{clusterId,jdbcType=INTEGER},
-            cluster_name     = #{clusterName,jdbcType=VARCHAR},
-            snapshot         = #{snapshot,jdbcType=LONGVARCHAR},
-            report_time      = #{reportTime,jdbcType=TIMESTAMP},
-            ext_params       = #{extParams,jdbcType=LONGVARCHAR},
-            version          = #{version,jdbcType=INTEGER},
-            status           = #{status,jdbcType=INTEGER},
-            previous_status  = #{previousStatus,jdbcType=INTEGER},
-            is_deleted       = #{isDeleted,jdbcType=INTEGER},
-            creator          = #{creator,jdbcType=VARCHAR},
-            modifier         = #{modifier,jdbcType=VARCHAR},
-            create_time      = #{createTime,jdbcType=TIMESTAMP},
-            modify_time      = #{modifyTime,jdbcType=TIMESTAMP}
+        set inlong_group_id    = #{inlongGroupId,jdbcType=VARCHAR},
+            inlong_stream_id   = #{inlongStreamId,jdbcType=VARCHAR},
+            source_type        = #{sourceType,jdbcType=VARCHAR},
+            source_name        = #{sourceName,jdbcType=VARCHAR},
+            agent_ip           = #{agentIp,jdbcType=VARCHAR},
+            uuid               = #{uuid,jdbcType=VARCHAR},
+            server_id          = #{serverId,jdbcType=INTEGER},
+            cluster_id         = #{clusterId,jdbcType=INTEGER},
+            serialization_type = #{serializationType,jdbcType=VARCHAR},
+            snapshot           = #{snapshot,jdbcType=LONGVARCHAR},
+            report_time        = #{reportTime,jdbcType=TIMESTAMP},
+            ext_params         = #{extParams,jdbcType=LONGVARCHAR},
+            version            = #{version,jdbcType=INTEGER},
+            status             = #{status,jdbcType=INTEGER},
+            previous_status    = #{previousStatus,jdbcType=INTEGER},
+            is_deleted         = #{isDeleted,jdbcType=INTEGER},
+            creator            = #{creator,jdbcType=VARCHAR},
+            modifier           = #{modifier,jdbcType=VARCHAR},
+            create_time        = #{createTime,jdbcType=TIMESTAMP},
+            modify_time        = #{modifyTime,jdbcType=TIMESTAMP}
         where id = #{id,jdbcType=INTEGER}
     </update>
     <update id="updateStatus">
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 69b4c10b8..76d9b9f5e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -119,22 +119,22 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     public PageInfo<? extends SourceListResponse> listByCondition(SourcePageRequest request) {
         Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
-        List<StreamSourceEntity> entityPage = sourceMapper.selectByCondition(request);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByCondition(request);
 
         // Encapsulate the paging query results into the PageInfo object to obtain related paging information
         Map<SourceType, Page<StreamSourceEntity>> sourceMap = Maps.newHashMap();
-        for (StreamSourceEntity streamSource : entityPage) {
-            SourceType sourceType = SourceType.forType(streamSource.getSourceType());
-            sourceMap.computeIfAbsent(sourceType, k -> new Page<>()).add(streamSource);
+        for (StreamSourceEntity entity : entityList) {
+            SourceType sourceType = SourceType.forType(entity.getSourceType());
+            sourceMap.computeIfAbsent(sourceType, k -> new Page<>()).add(entity);
         }
-        List<SourceListResponse> sourceListResponses = Lists.newArrayList();
+        List<SourceListResponse> responseList = Lists.newArrayList();
         for (Map.Entry<SourceType, Page<StreamSourceEntity>> entry : sourceMap.entrySet()) {
             SourceType sourceType = entry.getKey();
             StreamSourceOperation operation = operationFactory.getInstance(sourceType);
             PageInfo<? extends SourceListResponse> pageInfo = operation.getPageInfo(entry.getValue());
-            sourceListResponses.addAll(pageInfo.getList());
+            responseList.addAll(pageInfo.getList());
         }
-        PageInfo<? extends SourceListResponse> pageInfo = PageInfo.of(sourceListResponses);
+        PageInfo<? extends SourceListResponse> pageInfo = PageInfo.of(responseList);
 
         LOGGER.debug("success to list source page, result size {}", pageInfo.getSize());
         return pageInfo;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
index 2d349368e..819ff7af3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
@@ -18,13 +18,18 @@
 package org.apache.inlong.manager.service.source.file;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.file.FileSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.file.FileSourceRequest;
 import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -42,6 +47,14 @@ public class FileSourceOperation extends AbstractSourceOperation {
     @Autowired
     private ObjectMapper objectMapper;
 
+    @Override
+    public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
+        if (CollectionUtils.isEmpty(entityPage)) {
+            return new PageInfo<>();
+        }
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, FileSourceListResponse::new));
+    }
+
     @Override
     protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
         FileSourceRequest sourceRequest = (FileSourceRequest) request;
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 052726244..4e1b9f639 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -450,28 +450,27 @@ CREATE TABLE `source_file_detail`
 DROP TABLE IF EXISTS `stream_source`;
 CREATE TABLE `stream_source`
 (
-    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'ID',
-    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `source_type`      varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
-    `source_name`      varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
-    `agent_ip`         varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task',
-    `uuid`             varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
-    `server_id`        int(11)               DEFAULT NULL COMMENT 'Id of the source server',
-    `server_name`      varchar(50)           DEFAULT '' COMMENT 'Name of the source server',
-    `cluster_id`       int(11)               DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
-    `cluster_name`     varchar(50)           DEFAULT '' COMMENT 'Name of the cluster that collected this source',
-    `snapshot`         text                  DEFAULT NULL COMMENT 'Snapshot of this source task',
-    `report_time`      timestamp    NULL COMMENT 'Snapshot time',
-    `ext_params`       text                  DEFAULT NULL COMMENT 'Another fields will saved as JSON string, such as filePath, dbName, tableName, etc',
-    `version`          int(11)               DEFAULT '1' COMMENT 'Stream source version',
-    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
-    `is_deleted`       int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`      timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`      timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `id`                 int(11)      NOT NULL AUTO_INCREMENT COMMENT 'ID',
+    `inlong_group_id`    varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id`   varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `source_type`        varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
+    `source_name`        varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
+    `agent_ip`           varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task',
+    `uuid`               varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
+    `server_id`          int(11)               DEFAULT NULL COMMENT 'Id of the source server',
+    `cluster_id`         int(11)               DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
+    `serialization_type` varchar(20)           DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
+    `snapshot`           text                  DEFAULT NULL COMMENT 'Snapshot of this source task',
+    `report_time`        timestamp    NULL COMMENT 'Snapshot time',
+    `ext_params`         text                  DEFAULT NULL COMMENT 'Another fields will saved as JSON string, such as filePath, dbName, tableName, etc',
+    `version`            int(11)               DEFAULT '1' COMMENT 'Stream source version',
+    `status`             int(4)                DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`    int(4)                DEFAULT '0' COMMENT 'Previous status',
+    `is_deleted`         int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`            varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`           varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`        timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`        timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
     KEY `source_status_idx` (`status`, `is_deleted`),
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 987ec4820..d3f2c384b 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -474,28 +474,27 @@ CREATE TABLE `source_file_detail`
 DROP TABLE IF EXISTS `stream_source`;
 CREATE TABLE `stream_source`
 (
-    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'ID',
-    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `source_name`      varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
-    `source_type`      varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
-    `agent_ip`         varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task',
-    `uuid`             varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
-    `server_id`        int(11)               DEFAULT NULL COMMENT 'Id of the source server',
-    `server_name`      varchar(50)           DEFAULT '' COMMENT 'Name of the source server',
-    `cluster_id`       int(11)               DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
-    `cluster_name`     varchar(50)           DEFAULT '' COMMENT 'Name of the cluster that collected this source',
-    `snapshot`         text                  DEFAULT NULL COMMENT 'Snapshot of this source task',
-    `report_time`      timestamp    NULL COMMENT 'Snapshot time',
-    `ext_params`       text                  DEFAULT NULL COMMENT 'Another fields will saved as JSON string, such as filePath, dbName, tableName, etc',
-    `version`          int(11)               DEFAULT '1' COMMENT 'Stream source version',
-    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
-    `is_deleted`       int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`      timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`      timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `id`                 int(11)      NOT NULL AUTO_INCREMENT COMMENT 'ID',
+    `inlong_group_id`    varchar(256) NOT NULL COMMENT 'Inlong group id',
+    `inlong_stream_id`   varchar(256) NOT NULL COMMENT 'Inlong stream id',
+    `source_name`        varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name',
+    `source_type`        varchar(20)           DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc',
+    `agent_ip`           varchar(40)           DEFAULT NULL COMMENT 'Ip of the agent running the task',
+    `uuid`               varchar(30)           DEFAULT NULL COMMENT 'Mac uuid of the agent running the task',
+    `server_id`          int(11)               DEFAULT NULL COMMENT 'Id of the source server',
+    `cluster_id`         int(11)               DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
+    `serialization_type` varchar(20)           DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc',
+    `snapshot`           text                  DEFAULT NULL COMMENT 'Snapshot of this source task',
+    `report_time`        timestamp    NULL COMMENT 'Snapshot time',
+    `ext_params`         text                  DEFAULT NULL COMMENT 'Another fields will saved as JSON string, such as filePath, dbName, tableName, etc',
+    `version`            int(11)               DEFAULT '1' COMMENT 'Stream source version',
+    `status`             int(4)                DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`    int(4)                DEFAULT '0' COMMENT 'Previous status',
+    `is_deleted`         int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`            varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`           varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`        timestamp    NULL     DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`        timestamp    NULL     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_source_name` (`inlong_group_id`, `inlong_stream_id`, `source_name`, `is_deleted`),
     KEY `source_status_idx` (`status`, `is_deleted`),