You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/08 08:04:18 UTC

[incubator-inlong] branch master updated: [INLONG-3550] Support File source in client and server (#3572)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 268b3dbd7 [INLONG-3550] Support File source in client and server (#3572)
268b3dbd7 is described below

commit 268b3dbd782085fc245f67d5d2bb2bb70d205033
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Fri Apr 8 16:04:12 2022 +0800

    [INLONG-3550] Support File source in client and server (#3572)
---
 .../apache/inlong/agent/constant/JobConstants.java |  2 -
 .../java/org/apache/inlong/agent/pojo/FileJob.java | 12 ++--
 .../apache/inlong/agent/pojo/JobProfileDto.java    | 20 +++---
 .../inlong/agent/plugin/sinks/ProxySink.java       | 70 +++++++--------------
 .../manager/client/api/source/AgentFileSource.java | 59 +++++++++++++++++
 .../manager/client/api/util/InlongParser.java      | 40 +++++++-----
 .../api/util/InlongStreamSourceTransfer.java       | 59 ++++++++++++++++-
 .../inlong/manager/common/enums/Constant.java      | 12 ----
 .../inlong/manager/common/enums/SourceType.java    |  7 +++
 .../pojo/source/binlog/BinlogSourceRequest.java    |  3 +-
 .../common/pojo/source/file/FileSourceDTO.java     | 73 ++++++++++++++++++++++
 .../pojo/source/file/FileSourceListResponse.java   | 51 +++++++++++++++
 .../common/pojo/source/file/FileSourceRequest.java | 56 +++++++++++++++++
 .../pojo/source/file/FileSourceResponse.java       | 51 +++++++++++++++
 .../pojo/source/kafka/KafkaSourceRequest.java      |  3 +-
 .../dao/mapper/StreamSourceEntityMapper.java       |  8 ++-
 .../resources/mappers/StreamSourceEntityMapper.xml | 19 +++++-
 .../service/core/impl/AgentServiceImpl.java        | 31 ++++++---
 .../source/AbstractStreamSourceOperation.java      |  3 +-
 .../service/source/StreamSourceServiceImpl.java    |  2 +-
 .../source/binlog/BinlogStreamSourceOperation.java |  5 +-
 .../FileStreamSourceOperation.java}                | 57 ++++++-----------
 .../source/kafka/KafkaStreamSourceOperation.java   |  5 +-
 .../service/core/impl/AgentServiceTest.java        |  6 +-
 .../core/source/StreamSourceServiceTest.java       | 14 ++---
 .../web/controller/CommonDBServerController.java   |  1 +
 .../web/controller/CommonFileServerController.java |  1 +
 27 files changed, 500 insertions(+), 170 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 146dd0601..034e43f73 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -47,9 +47,7 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_DIR_FILTER_PATTERN = "job.fileJob.dir.pattern";
     public static final String JOB_FILE_TIME_OFFSET = "job.fileJob.timeOffset";
     public static final String JOB_FILE_MAX_WAIT = "job.fileJob.file.max.wait";
-    public static final String JOB_ADDITION_STR = "job.fileJob.additionStr";
     public static final String JOB_CYCLE_UNIT = "job.fileJob.cycleUnit";
-    public static final String JOB_DIR_FILTER_PATH = "job.fileJob.dir.path";
 
     //Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 733d88665..21c94d34b 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -27,15 +27,12 @@ public class FileJob  {
     private Dir dir;
     private Thread thread;
     private int id;
-    private String pattern;
-    private String cycleUnit;
     private String timeOffset;
     private String addictiveString;
 
     @Data
     public static class Dir {
 
-        private String path;
         private String pattern;
     }
 
@@ -54,12 +51,13 @@ public class FileJob  {
     @Data
     public static class FileJobTaskConfig {
 
-        private String dataName;
-        private String path;
-        private int taskId;
         private String pattern;
-        private String cycleUnit;
+        // '1m' means one minute after, '-1m' means one minute before
+        // '1h' means one hour after, '-1h' means one hour before
+        // '1d' means one day after, '-1d' means one day before
+        // Null means from current timestamp
         private String timeOffset;
+        //For example: a=b&c=b&e=f
         private String additionalAttr;
     }
 
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 6a7587972..19a5f9dd1 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -17,11 +17,6 @@
 
 package org.apache.inlong.agent.pojo;
 
-import static java.util.Objects.requireNonNull;
-import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
-import static org.apache.inlong.agent.constant.JobConstants.SYNC_SEND_OPEN;
-
 import com.google.gson.Gson;
 import lombok.Data;
 import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -29,6 +24,11 @@ import org.apache.inlong.agent.conf.TriggerProfile;
 import org.apache.inlong.common.enums.TaskTypeEnum;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 
+import static java.util.Objects.requireNonNull;
+import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
+import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static org.apache.inlong.agent.constant.JobConstants.SYNC_SEND_OPEN;
+
 @Data
 public class JobProfileDto {
 
@@ -92,26 +92,20 @@ public class JobProfileDto {
 
     private static FileJob getFileJob(DataConfig dataConfigs) {
         FileJob fileJob = new FileJob();
+        fileJob.setId(dataConfigs.getTaskId());
         fileJob.setTrigger(DEFAULT_TRIGGER);
 
         FileJob.FileJobTaskConfig fileJobTaskConfig = GSON.fromJson(dataConfigs.getExtParams(),
                 FileJob.FileJobTaskConfig.class);
 
         FileJob.Dir dir = new FileJob.Dir();
-        dir.setPattern(fileJobTaskConfig.getDataName());
-        dir.setPath(fileJobTaskConfig.getPath());
+        dir.setPattern(fileJobTaskConfig.getPattern());
         fileJob.setDir(dir);
-
-        fileJob.setId(fileJobTaskConfig.getTaskId());
         fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
 
         if (!fileJobTaskConfig.getAdditionalAttr().isEmpty()) {
             fileJob.setAddictiveString(fileJobTaskConfig.getAdditionalAttr());
         }
-        if (fileJobTaskConfig.getCycleUnit() != null) {
-            fileJob.setCycleUnit(fileJobTaskConfig.getCycleUnit());
-        }
-
         return fileJob;
     }
 
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 514887e7f..8d31a380f 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -17,43 +17,12 @@
 
 package org.apache.inlong.agent.plugin.sinks;
 
-import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_AGENT_IP;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_ID;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_OCEANUS_BL;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_OCEANUS_F;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_SYNC;
-import static org.apache.inlong.agent.constant.JobConstants.PROXY_BATCH_FLUSH_INTERVAL;
-import static org.apache.inlong.agent.constant.JobConstants.PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.JobConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static org.apache.inlong.agent.constant.JobConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
-import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
-import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ADDITION_STR;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_IP;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
@@ -62,6 +31,27 @@ import org.apache.inlong.agent.utils.AgentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_SYNC;
+import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
+import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
+import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
+import static org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
+import static org.apache.inlong.agent.constant.JobConstants.PROXY_BATCH_FLUSH_INTERVAL;
+import static org.apache.inlong.agent.constant.JobConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
+import static org.apache.inlong.agent.constant.JobConstants.PROXY_PACKAGE_MAX_SIZE;
+import static org.apache.inlong.agent.constant.JobConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
+
 public class ProxySink extends AbstractSink {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class);
@@ -212,22 +202,6 @@ public class ProxySink extends AbstractSink {
         }
     }
 
-    private HashMap<String, String> parseAttrFromJobProfile(JobProfile jobProfile) {
-        HashMap<String, String> attr = new HashMap<>();
-        String additionStr = jobProfile.get(JOB_ADDITION_STR, "");
-        if (!additionStr.isEmpty()) {
-            Map<String, String> addAttr = AgentUtils.getAdditionAttr(additionStr);
-            attr.putAll(addAttr);
-        }
-        if (jobProfile.getBoolean(JOB_RETRY, false)) {
-            // used for online compute filter consume
-            attr.put(PROXY_OCEANUS_F, PROXY_OCEANUS_BL);
-        }
-        attr.put(PROXY_KEY_ID, jobProfile.get(JOB_ID));
-        attr.put(PROXY_KEY_AGENT_IP, jobProfile.get(JOB_IP));
-        return attr;
-    }
-
     @Override
     public void destroy() {
         LOGGER.info("destroy sink which sink from source name {}", sourceName);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
new file mode 100644
index 000000000..92953ab93
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/AgentFileSource.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.source;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.common.enums.SourceType;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base configuration for Agent file collection")
+public class AgentFileSource extends StreamSource {
+
+    @ApiModelProperty(value = "DataSource type", required = true)
+    private SourceType sourceType = SourceType.FILE;
+
+    @ApiModelProperty("SyncType for Kafka")
+    private SyncType syncType = SyncType.INCREMENT;
+
+    @ApiModelProperty("Data format type for kafka")
+    private DataFormat dataFormat = DataFormat.NONE;
+
+    @ApiModelProperty(value = "Agent IP address", required = true)
+    private String ip;
+
+    @ApiModelProperty(value = "Path regex pattern for file, such as /a/b/*.txt", required = true)
+    private String pattern;
+
+    @ApiModelProperty("TimeOffset for collection, "
+            + "'1m' means from one minute after, '-1m' means from one minute before, "
+            + "'1h' means from one hour after, '-1h' means from one minute before"
+            + "'1d' means from one day after, '-1d' means from one minute before"
+            + "Null means from current timestamp")
+    private String timeOffset;
+
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 4042fa1ff..af45811cd 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -41,6 +41,8 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
@@ -52,9 +54,6 @@ import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 
 import java.util.List;
 
-import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
-import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;
-
 /**
  * Parser for Inlong entity
  */
@@ -136,6 +135,11 @@ public class InlongParser {
                                 KafkaSourceResponse.class);
                         sourceResponses.add(kafkaSourceResponse);
                         break;
+                    case FILE:
+                        FileSourceResponse fileSourceResponse = GsonUtil.fromJson(sourceJson.toString(),
+                                FileSourceResponse.class);
+                        sourceResponses.add(fileSourceResponse);
+                        break;
                     default:
                         throw new RuntimeException(String.format("Unsupport sourceType=%s for Inlong", sourceType));
                 }
@@ -187,18 +191,23 @@ public class InlongParser {
         if (pageInfo.getList() != null && !pageInfo.getList().isEmpty()) {
             SourceListResponse sourceListResponse = pageInfo.getList().get(0);
             SourceType sourceType = SourceType.forType(sourceListResponse.getSourceType());
-            if (sourceType == BINLOG) {
-                return GsonUtil.fromJson(pageInfoJson,
-                        new TypeToken<PageInfo<BinlogSourceListResponse>>() {
-                        }.getType());
+            switch (sourceType) {
+                case BINLOG:
+                    return GsonUtil.fromJson(pageInfoJson,
+                            new TypeToken<PageInfo<BinlogSourceListResponse>>() {
+                            }.getType());
+                case KAFKA:
+                    return GsonUtil.fromJson(pageInfoJson,
+                            new TypeToken<PageInfo<KafkaSourceListResponse>>() {
+                            }.getType());
+                case FILE:
+                    return GsonUtil.fromJson(pageInfoJson,
+                            new TypeToken<PageInfo<FileSourceListResponse>>() {
+                            }.getType());
+                default:
+                    throw new IllegalArgumentException(
+                            String.format("Unsupported sourceType=%s for Inlong", sourceType));
             }
-            if (sourceType == KAFKA) {
-                return GsonUtil.fromJson(pageInfoJson,
-                        new TypeToken<PageInfo<KafkaSourceListResponse>>() {
-                        }.getType());
-            }
-            throw new IllegalArgumentException(
-                    String.format("Unsupported sourceType=%s for Inlong", sourceType));
         } else {
             return new PageInfo<>();
         }
@@ -221,7 +230,8 @@ public class InlongParser {
         if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) {
             MQType mqType = MQType.forType(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString());
             if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
-                InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
+                InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(),
+                        InlongGroupPulsarInfo.class);
                 groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum());
                 groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble());
                 groupApproveInfo.setWriteQuorum(pulsarInfo.getWriteQuorum());
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 115eb37da..93f513037 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.client.api.StreamSource;
 import org.apache.inlong.manager.client.api.StreamSource.State;
 import org.apache.inlong.manager.client.api.StreamSource.SyncType;
 import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
+import org.apache.inlong.manager.client.api.source.AgentFileSource;
 import org.apache.inlong.manager.client.api.source.KafkaSource;
 import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
 import org.apache.inlong.manager.common.enums.SourceType;
@@ -35,6 +36,9 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
@@ -54,6 +58,8 @@ public class InlongStreamSourceTransfer {
                 return createKafkaSourceRequest((KafkaSource) streamSource, streamInfo);
             case BINLOG:
                 return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
+            case FILE:
+                return createFileSourceRequest((AgentFileSource) streamSource, streamInfo);
             default:
                 throw new RuntimeException(String.format("Unsupported source=%s for Inlong", sourceType));
         }
@@ -68,6 +74,9 @@ public class InlongStreamSourceTransfer {
         if (sourceType == SourceType.BINLOG && sourceResponse instanceof BinlogSourceResponse) {
             return parseMySQLBinlogSource((BinlogSourceResponse) sourceResponse);
         }
+        if (sourceType == SourceType.FILE && sourceResponse instanceof FileSourceResponse) {
+            return parseAgentFileSource((FileSourceResponse) sourceResponse);
+        }
         throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
     }
 
@@ -80,6 +89,9 @@ public class InlongStreamSourceTransfer {
         if (sourceType == SourceType.BINLOG && sourceListResponse instanceof BinlogSourceListResponse) {
             return parseMySQLBinlogSource((BinlogSourceListResponse) sourceListResponse);
         }
+        if (sourceType == SourceType.FILE && sourceListResponse instanceof FileSourceListResponse) {
+            return parseAgentFileSource((FileSourceListResponse) sourceListResponse);
+        }
         throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
     }
 
@@ -176,12 +188,34 @@ public class InlongStreamSourceTransfer {
         return binlogSource;
     }
 
+    private static AgentFileSource parseAgentFileSource(FileSourceResponse response) {
+        AgentFileSource fileSource = new AgentFileSource();
+        fileSource.setSourceName(response.getSourceName());
+        fileSource.setState(State.parseByStatus(response.getStatus()));
+        fileSource.setDataFormat(DataFormat.NONE);
+        fileSource.setPattern(response.getPattern());
+        fileSource.setIp(response.getIp());
+        fileSource.setTimeOffset(response.getTimeOffset());
+        return fileSource;
+    }
+
+    private static AgentFileSource parseAgentFileSource(FileSourceListResponse response) {
+        AgentFileSource fileSource = new AgentFileSource();
+        fileSource.setSourceName(response.getSourceName());
+        fileSource.setState(State.parseByStatus(response.getStatus()));
+        fileSource.setDataFormat(DataFormat.NONE);
+        fileSource.setPattern(response.getPattern());
+        fileSource.setIp(response.getIp());
+        fileSource.setTimeOffset(response.getTimeOffset());
+        return fileSource;
+    }
+
     private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo stream) {
         KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
         sourceRequest.setSourceName(kafkaSource.getSourceName());
         sourceRequest.setInlongGroupId(stream.getInlongGroupId());
         sourceRequest.setInlongStreamId(stream.getInlongStreamId());
-        sourceRequest.setSourceType(kafkaSource.getSourceType().name());
+        sourceRequest.setSourceType(kafkaSource.getSourceType().getType());
         sourceRequest.setAgentIp(kafkaSource.getAgentIp());
         sourceRequest.setBootstrapServers(kafkaSource.getBootstrapServers());
         sourceRequest.setTopic(kafkaSource.getTopic());
@@ -204,8 +238,7 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setSourceName(binlogSource.getSourceName());
         sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
         sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
-        sourceRequest.setSourceType(binlogSource.getSourceType().name());
-        sourceRequest.setAgentIp(binlogSource.getAgentIp());
+        sourceRequest.setSourceType(binlogSource.getSourceType().getType());
         DefaultAuthentication authentication = binlogSource.getAuthentication();
         sourceRequest.setUser(authentication.getUserName());
         sourceRequest.setPassword(authentication.getPassword());
@@ -228,4 +261,24 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
         return sourceRequest;
     }
+
+    private static FileSourceRequest createFileSourceRequest(AgentFileSource fileSource, InlongStreamInfo streamInfo) {
+        FileSourceRequest sourceRequest = new FileSourceRequest();
+        sourceRequest.setSourceName(fileSource.getSourceName());
+        sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+        sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+        sourceRequest.setSourceType(fileSource.getSourceType().getType());
+        if (StringUtils.isEmpty(fileSource.getIp())) {
+            throw new IllegalArgumentException(
+                    String.format("AgentIp should not be null for fileSource=%s", fileSource));
+        }
+        sourceRequest.setIp(fileSource.getIp());
+        if (StringUtils.isEmpty(fileSource.getPattern())) {
+            throw new IllegalArgumentException(
+                    String.format("SourcePattern should not be null for fileSource=%s", fileSource));
+        }
+        sourceRequest.setPattern(fileSource.getPattern());
+        sourceRequest.setTimeOffset(fileSource.getTimeOffset());
+        return sourceRequest;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index f614460d9..44c185d71 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -28,14 +28,6 @@ public class Constant {
     public static final String URL_SPLITTER = ",";
     public static final String HOST_SPLITTER = ":";
 
-    public static final String SOURCE_FILE = "FILE";
-
-    public static final String SOURCE_SQL = "SQL";
-
-    public static final String SOURCE_BINLOG = "BINLOG";
-
-    public static final String SOURCE_KAFKA = "KAFKA";
-
     public static final String SINK_HIVE = "HIVE";
 
     public static final String SINK_KAFKA = "KAFKA";
@@ -77,10 +69,6 @@ public class Constant {
 
     public static final String REQUEST_IS_EMPTY = "request is empty";
 
-    public static final String SOURCE_TYPE_IS_EMPTY = "sourceType is empty";
-
-    public static final String SOURCE_TYPE_NOT_SAME = "Expected sourceType is %s, but found %s";
-
     public static final String SINK_TYPE_IS_EMPTY = "sinkType is empty";
 
     public static final String SINK_TYPE_NOT_SAME = "Expected sinkType is %s, but found %s";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 19c17af6a..33ecdca8f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -33,6 +33,13 @@ public enum SourceType {
     BINLOG("BINLOG", TaskTypeEnum.BINLOG),
     KAFKA("KAFKA", TaskTypeEnum.KAFKA);
 
+    public static final String SOURCE_FILE = "FILE";
+    public static final String SOURCE_SQL = "SQL";
+    public static final String SOURCE_BINLOG = "BINLOG";
+    public static final String SOURCE_KAFKA = "KAFKA";
+    public static final String SOURCE_TYPE_IS_EMPTY = "sourceType is empty";
+    public static final String SOURCE_TYPE_NOT_SAME = "Expected sourceType is %s, but found %s";
+
     @Getter
     private final String type;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index 4af744106..f5b16d044 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -22,7 +22,6 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -34,7 +33,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
 @ApiModel(value = "Request of the binlog source info")
-@JsonTypeDefine(value = Constant.SOURCE_BINLOG)
+@JsonTypeDefine(value = SourceType.SOURCE_BINLOG)
 public class BinlogSourceRequest extends SourceRequest {
 
     @ApiModelProperty("Username of the DB server")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
new file mode 100644
index 000000000..7d5f2338a
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceDTO.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.file;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * File source information data transfer object
+ */
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+public class FileSourceDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("Agent IP address")
+    private String ip;
+
+    @ApiModelProperty("Path regex pattern for file, such as /a/b/*.txt")
+    private String pattern;
+
+    @ApiModelProperty("TimeOffset for collection, "
+            + "'1m' means from one minute after, '-1m' means from one minute before, "
+            + "'1h' means from one hour after, '-1h' means from one minute before"
+            + "'1d' means from one day after, '-1d' means from one minute before"
+            + "Null means from current timestamp")
+    private String timeOffset;
+
+    public static FileSourceDTO getFromRequest(@NotNull FileSourceRequest fileSourceRequest) {
+        return FileSourceDTO.builder()
+                .ip(fileSourceRequest.getIp())
+                .pattern(fileSourceRequest.getPattern())
+                .timeOffset(fileSourceRequest.getTimeOffset())
+                .build();
+    }
+
+    public static FileSourceDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, FileSourceDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceListResponse.java
new file mode 100644
index 000000000..84871259e
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceListResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.file;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+
+/**
+ * Response of File source list
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of File source paging list")
+public class FileSourceListResponse extends SourceListResponse {
+
+    @ApiModelProperty("Agent IP address")
+    private String ip;
+
+    @ApiModelProperty("Path regex pattern for file, such as /a/b/*.txt")
+    private String pattern;
+
+    @ApiModelProperty("TimeOffset for collection, "
+            + "'1m' means from one minute after, '-1m' means from one minute before, "
+            + "'1h' means from one hour after, '-1h' means from one minute before"
+            + "'1d' means from one day after, '-1d' means from one minute before"
+            + "Null means from current timestamp")
+    private String timeOffset;
+
+    public FileSourceListResponse() {
+        this.setSourceType(SourceType.FILE.getType());
+    }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceRequest.java
new file mode 100644
index 000000000..11ec99bae
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.file;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of File Source Info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the kafka source info")
+@JsonTypeDefine(value = SourceType.SOURCE_FILE)
+public class FileSourceRequest extends SourceRequest {
+
+    @ApiModelProperty("Agent IP address")
+    private String ip;
+
+    @ApiModelProperty("Path regex pattern for file, such as /a/b/*.txt")
+    private String pattern;
+
+    @ApiModelProperty("TimeOffset for collection, "
+            + "'1m' means from one minute after, '-1m' means from one minute before, "
+            + "'1h' means from one hour after, '-1h' means from one minute before"
+            + "'1d' means from one day after, '-1d' means from one minute before"
+            + "Null means from current timestamp")
+    private String timeOffset;
+
+    public FileSourceRequest() {
+        this.setSourceType(SourceType.FILE.toString());
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceResponse.java
new file mode 100644
index 000000000..d8c0e85a2
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/file/FileSourceResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.file;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+
+/**
+ * Response of the File source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of the File source")
+public class FileSourceResponse extends SourceResponse {
+
+    @ApiModelProperty("Agent IP address")
+    private String ip;
+
+    @ApiModelProperty("Path regex pattern for file, such as /a/b/*.txt")
+    private String pattern;
+
+    @ApiModelProperty("TimeOffset for collection, "
+            + "'1m' means one minute before, '1h' means one hour before, '1d' means one day before, "
+            + "Null means from current timestamp")
+    private String timeOffset;
+
+    public FileSourceResponse() {
+        this.setSourceType(SourceType.FILE.getType());
+    }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
index 0b6a23091..86e0168dc 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceRequest.java
@@ -22,7 +22,6 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
@@ -34,7 +33,7 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
 @ApiModel(value = "Request of the kafka source info")
-@JsonTypeDefine(value = Constant.SOURCE_KAFKA)
+@JsonTypeDefine(value = SourceType.SOURCE_KAFKA)
 public class KafkaSourceRequest extends SourceRequest {
 
     @ApiModelProperty("Kafka topic")
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 1cd75af2a..d8f7d2e36 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -57,7 +57,13 @@ public interface StreamSourceEntityMapper {
     /**
      * Query the tasks by the given status list.
      */
-    List<StreamSourceEntity> selectByStatus(@Param("list") List<Integer> list);
+    List<StreamSourceEntity> selectByStatus(@Param("list") List<Integer> list, @Param("limit") int limit);
+
+    /**
+     * Query the tasks by the given status list and type List.
+     */
+    List<StreamSourceEntity> selectByStatusAndType(@Param("list") List<Integer> list,
+            @Param("sourceType") List<String> sourceTypes, @Param("limit") int limit);
 
     /**
      * Query the sources with status 20x by the given agent IP and agent UUID.
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index fddab4050..666bb6029 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -156,7 +156,24 @@
             <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
                 #{item}
             </foreach>
-            limit 2
+            limit #{limit, jdbcType=INTEGER}
+        </where>
+    </select>
+    <select id="selectByStatusAndType" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source
+        <where>
+            is_deleted = 0
+            and status in
+            <foreach item="item" index="index" collection="list" open="(" close=")" separator=",">
+                #{item}
+            </foreach>
+            and source_type in
+            <foreach item="item" index="index" collection="sourceType" open="(" close=")" separator=",">
+                #{item}
+            </foreach>
+            limit #{limit, jdbcType=INTEGER}
         </where>
     </select>
     <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 62723bb1a..feb3a1e40 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -32,13 +32,12 @@ import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.enums.SourceState;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
-import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.service.core.AgentService;
 import org.apache.inlong.manager.service.core.StreamConfigLogService;
@@ -65,18 +64,15 @@ public class AgentServiceImpl implements AgentService {
     private static final int UNISSUED_STATUS = 2;
     private static final int ISSUED_STATUS = 3;
     private static final int MODULUS_100 = 100;
+    private static final int TASK_FETCH_SIZE = 2;
 
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
     @Autowired
     private SourceSnapshotOperation snapshotOperation;
     @Autowired
-    private SourceFileDetailEntityMapper fileDetailMapper;
-    @Autowired
     private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;
     @Autowired
-    private InlongStreamFieldEntityMapper streamFieldMapper;
-    @Autowired
     private InlongStreamEntityMapper streamMapper;
     @Autowired
     private StreamConfigLogService streamConfigLogService;
@@ -154,6 +150,8 @@ public class AgentServiceImpl implements AgentService {
             throw new BusinessException("agent request or agent ip was empty, just return");
         }
 
+        final String agentIp = request.getAgentIp();
+        final String uuid = request.getUuid();
         // Query the tasks that needed to add or active - without agentIp and uuid
         List<Integer> needAddStatusList;
         if (PullJobTypeEnum.NEVER != PullJobTypeEnum.getPullJobType(request.getPullJobType())) {
@@ -163,10 +161,11 @@ public class AgentServiceImpl implements AgentService {
             LOGGER.warn("agent pull job type is [NEVER], just pull to be active tasks");
             needAddStatusList = Collections.singletonList(SourceState.TO_BE_ISSUED_ACTIVE.getCode());
         }
-        List<StreamSourceEntity> entityList = sourceMapper.selectByStatus(needAddStatusList);
+        List<String> sourceTypes = Lists.newArrayList(SourceType.BINLOG.getType(), SourceType.KAFKA.getType(),
+                SourceType.SQL.getType());
+        List<StreamSourceEntity> entityList = sourceMapper.selectByStatusAndType(needAddStatusList, sourceTypes,
+                TASK_FETCH_SIZE);
 
-        String agentIp = request.getAgentIp();
-        String uuid = request.getUuid();
         // Query other tasks by agentIp and uuid - not included status with TO_BE_ISSUED_ADD and TO_BE_ISSUED_ACTIVE
         List<Integer> statusList = Arrays.asList(SourceState.TO_BE_ISSUED_DELETE.getCode(),
                 SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(),
@@ -175,6 +174,15 @@ public class AgentServiceImpl implements AgentService {
         List<StreamSourceEntity> needIssuedList = sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
         entityList.addAll(needIssuedList);
 
+        List<StreamSourceEntity> fileEntityList = sourceMapper.selectByStatusAndType(needAddStatusList,
+                Lists.newArrayList(SourceType.FILE.getType()), TASK_FETCH_SIZE * 2);
+        for (StreamSourceEntity fileEntity : fileEntityList) {
+            FileSourceDTO fileSourceDTO = FileSourceDTO.getFromJson(fileEntity.getExtParams());
+            if (agentIp.equals(fileSourceDTO.getIp())) {
+                entityList.add(fileEntity);
+            }
+        }
+
         List<DataConfig> dataConfigs = Lists.newArrayList();
         for (StreamSourceEntity entity : entityList) {
             // Change 20x to 30x
@@ -208,6 +216,11 @@ public class AgentServiceImpl implements AgentService {
         return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
     }
 
+    /**
+     * If status of source is failed, record.
+     *
+     * @param entity
+     */
     private void logFailedStreamSource(StreamSourceEntity entity) {
         InlongStreamConfigLogRequest request = new InlongStreamConfigLogRequest();
         request.setInlongGroupId(entity.getInlongGroupId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
index 8c1dbbc0b..7ab709784 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractStreamSourceOperation.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.enums.SourceState;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
@@ -109,7 +110,7 @@ public abstract class AbstractStreamSourceOperation implements StreamSourceOpera
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
-                String.format(Constant.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+                String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
         return this.getFromEntity(entity, this::getResponse);
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 40bf56b35..69b4c10b8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -295,7 +295,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         String streamId = request.getInlongStreamId();
         Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
         String sourceType = request.getSourceType();
-        Preconditions.checkNotNull(sourceType, Constant.SOURCE_TYPE_IS_EMPTY);
+        Preconditions.checkNotNull(sourceType, SourceType.SOURCE_TYPE_IS_EMPTY);
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
index 6531900ba..2c0d296c6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageInfo;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -57,7 +56,7 @@ public class BinlogStreamSourceOperation extends AbstractStreamSourceOperation {
 
     @Override
     protected String getSourceType() {
-        return Constant.SOURCE_BINLOG;
+        return SourceType.BINLOG.getType();
     }
 
     @Override
@@ -93,7 +92,7 @@ public class BinlogStreamSourceOperation extends AbstractStreamSourceOperation {
         }
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
-                String.format(Constant.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+                String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
         BinlogSourceDTO dto = BinlogSourceDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
         CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java
similarity index 64%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
copy to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java
index a4562ee17..29e1e4938 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileStreamSourceOperation.java
@@ -15,23 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.source.kafka;
+package org.apache.inlong.manager.service.source.file;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageInfo;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceDTO;
-import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
-import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
-import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
@@ -41,48 +35,37 @@ import org.springframework.stereotype.Service;
 
 import java.util.function.Supplier;
 
-/**
- * kafka stream source operation.
- */
 @Service
-public class KafkaStreamSourceOperation extends AbstractStreamSourceOperation {
+public class FileStreamSourceOperation extends AbstractStreamSourceOperation {
 
     @Autowired
     private ObjectMapper objectMapper;
 
     @Override
-    public Boolean accept(SourceType sourceType) {
-        return SourceType.KAFKA == sourceType;
+    protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+        FileSourceRequest sourceRequest = (FileSourceRequest) request;
+        CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+        try {
+            FileSourceDTO dto = FileSourceDTO.getFromRequest(sourceRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
     }
 
     @Override
     protected String getSourceType() {
-        return Constant.SOURCE_KAFKA;
+        return SourceType.SOURCE_FILE;
     }
 
     @Override
     protected SourceResponse getResponse() {
-        return new KafkaSourceResponse();
+        return new FileSourceResponse();
     }
 
     @Override
-    public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
-        if (CollectionUtils.isEmpty(entityPage)) {
-            return new PageInfo<>();
-        }
-        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, KafkaSourceListResponse::new));
-    }
-
-    @Override
-    protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
-        KafkaSourceRequest sourceRequest = (KafkaSourceRequest) request;
-        CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
-        try {
-            KafkaSourceDTO dto = KafkaSourceDTO.getFromRequest(sourceRequest);
-            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
-        } catch (Exception e) {
-            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
-        }
+    public Boolean accept(SourceType sourceType) {
+        return sourceType == SourceType.FILE;
     }
 
     @Override
@@ -93,8 +76,8 @@ public class KafkaStreamSourceOperation extends AbstractStreamSourceOperation {
         }
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
-                String.format(Constant.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
-        KafkaSourceDTO dto = KafkaSourceDTO.getFromJson(entity.getExtParams());
+                String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+        FileSourceDTO dto = FileSourceDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
         CommonBeanUtils.copyProperties(dto, result, true);
         return result;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
index a4562ee17..f54a3a782 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaStreamSourceOperation.java
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageInfo;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -57,7 +56,7 @@ public class KafkaStreamSourceOperation extends AbstractStreamSourceOperation {
 
     @Override
     protected String getSourceType() {
-        return Constant.SOURCE_KAFKA;
+        return SourceType.KAFKA.getType();
     }
 
     @Override
@@ -93,7 +92,7 @@ public class KafkaStreamSourceOperation extends AbstractStreamSourceOperation {
         }
         String existType = entity.getSourceType();
         Preconditions.checkTrue(getSourceType().equals(existType),
-                String.format(Constant.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+                String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
         KafkaSourceDTO dto = KafkaSourceDTO.getFromJson(entity.getExtParams());
         CommonBeanUtils.copyProperties(entity, result, true);
         CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index 003936bda..e387afef2 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.core.impl;
 
 import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.AgentService;
@@ -49,7 +49,7 @@ public class AgentServiceTest extends ServiceBaseTest {
         BinlogSourceRequest sourceInfo = new BinlogSourceRequest();
         sourceInfo.setInlongGroupId(globalGroupId);
         sourceInfo.setInlongStreamId(globalStreamId);
-        sourceInfo.setSourceType(Constant.SOURCE_BINLOG);
+        sourceInfo.setSourceType(SourceType.BINLOG.getType());
 
         return sourceService.save(sourceInfo, globalOperator);
     }
@@ -70,7 +70,7 @@ public class AgentServiceTest extends ServiceBaseTest {
         Boolean result = agentService.reportSnapshot(request);
         Assert.assertTrue(result);
 
-        sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
+        sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
     }
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
index 4b84ecad0..38e14ac92 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.source;
 
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
@@ -51,7 +51,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         sourceInfo.setInlongGroupId(globalGroupId);
         sourceInfo.setInlongStreamId(globalStreamId);
         sourceInfo.setSourceName(sourceName);
-        sourceInfo.setSourceType(Constant.SOURCE_BINLOG);
+        sourceInfo.setSourceType(SourceType.BINLOG.getType());
         return sourceService.save(sourceInfo, globalOperator);
     }
 
@@ -60,7 +60,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         Integer id = this.saveSource();
         Assert.assertNotNull(id);
 
-        boolean result = sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
+        boolean result = sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
         Assert.assertTrue(result);
     }
 
@@ -68,16 +68,16 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
     public void testListByIdentifier() {
         Integer id = this.saveSource();
 
-        SourceResponse source = sourceService.get(id, Constant.SOURCE_BINLOG);
+        SourceResponse source = sourceService.get(id, SourceType.BINLOG.getType());
         Assert.assertEquals(globalGroupId, source.getInlongGroupId());
 
-        sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
+        sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
     }
 
     @Test
     public void testGetAndUpdate() {
         Integer id = this.saveSource();
-        SourceResponse response = sourceService.get(id, Constant.SOURCE_BINLOG);
+        SourceResponse response = sourceService.get(id, SourceType.BINLOG.getType());
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         BinlogSourceResponse binlogResponse = (BinlogSourceResponse) response;
@@ -86,7 +86,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         boolean result = sourceService.update(request, globalOperator);
         Assert.assertTrue(result);
 
-        sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
+        sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
     }
 
 }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonDBServerController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonDBServerController.java
index 37e1ceb0d..55f60ab0f 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonDBServerController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonDBServerController.java
@@ -66,6 +66,7 @@ import org.springframework.web.multipart.MultipartFile;
 @RestController
 @RequestMapping("commonserver/db")
 @Api(tags = "Common Server - DB")
+@Deprecated
 public class CommonDBServerController {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CommonDBServerController.class);
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonFileServerController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonFileServerController.java
index 09e2cb5b4..d1164b1d4 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonFileServerController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/CommonFileServerController.java
@@ -67,6 +67,7 @@ import org.springframework.web.multipart.MultipartFile;
 @RestController
 @RequestMapping("commonserver/file")
 @Api(tags = "Common Server - File")
+@Deprecated
 public class CommonFileServerController {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CommonFileServerController.class);