You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/03 13:09:45 UTC

[incubator-inlong] branch master updated: [INLONG-2867][Manager] Support report the task result and get tasks for the agent (#2868)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 859480a  [INLONG-2867][Manager] Support report the task result and get tasks for the agent (#2868)
859480a is described below

commit 859480a8b97e15b9f376415ed5719350dbb2632c
Author: healchow <he...@gmail.com>
AuthorDate: Thu Mar 3 21:09:41 2022 +0800

    [INLONG-2867][Manager] Support report the task result and get tasks for the agent (#2868)
---
 .../apache/inlong/agent/conf/TriggerProfile.java   |  15 ++-
 .../java/org/apache/inlong/agent/db/CommandDb.java |  25 ++--
 .../apache/inlong/agent/pojo/JobProfileDto.java    |   8 +-
 .../apache/inlong/agent/db/TestBerkeleyDBImp.java  |   9 +-
 .../CmdConfig.java => constant/Constants.java}     |  17 +--
 .../org/apache/inlong/common/db/CommandEntity.java |  10 +-
 .../apache/inlong/common/pojo/agent/CmdConfig.java |   5 +
 .../inlong/common/pojo/agent/DataConfig.java       |  13 +-
 .../inlong/common/pojo/agent/TaskRequest.java      |   5 +
 .../inlong/common/pojo/agent/TaskResult.java       |   3 +
 .../api/impl/DefaultInlongStreamBuilder.java       |   2 +-
 .../client/api/source/MySQLBinlogSource.java       |   2 +-
 .../manager/client/api/source/MySQLSource.java     |   4 +-
 .../manager/client/api/util/InlongParser.java      |  23 ++--
 .../api/util/InlongStreamSourceTransfer.java       |   4 +-
 .../inlong/manager/common/enums/Constant.java      |   4 +-
 .../inlong/manager/common/enums/SourceType.java    |   7 +-
 .../common/pojo/agent/FileAgentCommandInfo.java    |   8 +-
 .../pojo/source/binlog/BinlogSourceRequest.java    |   4 +-
 .../pojo/source/binlog/BinlogSourceResponse.java   |   2 +-
 .../dao/mapper/StreamSourceEntityMapper.java       |  18 +--
 .../resources/mappers/StreamSourceEntityMapper.xml |  57 +++-----
 .../{AgentTaskService.java => AgentService.java}   |  25 +++-
 ...tTaskServiceImpl.java => AgentServiceImpl.java} | 150 +++++++++++++++------
 .../service/source/SourceSnapshotOperation.java    |   2 +-
 .../service/source/StreamSourceService.java        |   9 --
 .../service/source/StreamSourceServiceImpl.java    |  16 +--
 .../source/binlog/BinlogStreamSourceOperation.java |   4 +-
 .../listener/AbstractSourceOperateListener.java    |   2 +-
 .../thirdparty/sort/util/SerializationUtils.java   |   2 +-
 .../inlong/manager/service/ServiceBaseTest.java    |   4 +
 .../AgentServiceTest.java}                         |  55 ++------
 .../core/source/StreamSourceServiceTest.java       |  40 +-----
 .../source/listener/DataSourceListenerTest.java    |   4 +-
 .../web/controller/openapi/AgentController.java    |  32 +++--
 35 files changed, 306 insertions(+), 284 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
index 6d1aa79..82bc73b 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TriggerProfile.java
@@ -21,11 +21,19 @@ import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.pojo.JobProfileDto;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+
 /**
  * profile used in trigger. Trigger profile is a special job profile
  */
 public class TriggerProfile extends JobProfile {
 
+    private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
     /**
      * Parse a given json string and get a TriggerProfile
      */
@@ -67,8 +75,11 @@ public class TriggerProfile extends JobProfile {
         return getInt(JobConstants.JOB_OP);
     }
 
-    public String getDeliveryTime() {
-        return get(JobConstants.JOB_DELIVERY_TIME);
+    public Date getDeliveryTime() {
+        String dateStr = get(JobConstants.JOB_DELIVERY_TIME);
+        LocalDateTime localDateTime = LocalDateTime.parse(dateStr, TIME_FORMATTER);
+        Instant instant = localDateTime.atZone(ZoneId.systemDefault()).toInstant();
+        return Date.from(instant);
     }
 
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
index fedeeb5..66d0697 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/db/CommandDb.java
@@ -17,17 +17,17 @@
 
 package org.apache.inlong.agent.db;
 
-import java.util.List;
 import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.db.CommandEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
+/**
+ * Command for database
+ */
 public class CommandDb {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(CommandDb.class);
-    public static final int MANAGER_SUCCESS_CODE = 0;
-    public static final int MANAGER_FAIL_CODE = 1;
     private final Db db;
 
     public CommandDb(Db db) {
@@ -36,7 +36,6 @@ public class CommandDb {
 
     /**
      * store manager command to db
-     * @param commandEntity
      */
     public void storeCommand(CommandEntity commandEntity) {
         db.putCommand(commandEntity);
@@ -44,7 +43,6 @@ public class CommandDb {
 
     /**
      * get those commands not ack to manager
-     * @return
      */
     public List<CommandEntity> getUnackedCommands() {
         return db.searchCommands(false);
@@ -52,31 +50,26 @@ public class CommandDb {
 
     /**
      * save normal command result for trigger
-     * @param profile
-     * @param success
      */
     public void saveNormalCmds(TriggerProfile profile, boolean success) {
         CommandEntity entity = new CommandEntity();
         entity.setId(CommandEntity.generateCommandId(profile.getTriggerId(), profile.getOpType()));
-        entity.setTaskId(Integer.valueOf(profile.getTriggerId()));
+        entity.setTaskId(Integer.parseInt(profile.getTriggerId()));
         entity.setDeliveryTime(profile.getDeliveryTime());
-        entity.setCommandResult(success ? MANAGER_SUCCESS_CODE : MANAGER_FAIL_CODE);
+        entity.setCommandResult(success ? Constants.RESULT_SUCCESS : Constants.RESULT_FAIL);
         entity.setAcked(false);
         storeCommand(entity);
     }
 
     /**
      * save special command result for trigger (retry\makeup\check)
-     * @param id
-     * @param taskId
-     * @param success
      */
     public void saveSpecialCmds(Integer id, Integer taskId, boolean success) {
         CommandEntity entity = new CommandEntity();
         entity.setId(String.valueOf(id));
         entity.setTaskId(taskId);
         entity.setAcked(false);
-        entity.setCommandResult(success ? MANAGER_SUCCESS_CODE : MANAGER_FAIL_CODE);
+        entity.setCommandResult(success ? Constants.RESULT_SUCCESS : Constants.RESULT_FAIL);
         storeCommand(entity);
     }
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 6393732..4ce2d90 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -17,13 +17,16 @@
 
 package org.apache.inlong.agent.pojo;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import com.google.gson.Gson;
 import lombok.Data;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.TriggerProfile;
-import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.enums.TaskTypeEnum;
+import org.apache.inlong.common.pojo.agent.DataConfig;
+
+import java.util.Date;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
@@ -213,7 +216,8 @@ public class JobProfileDto {
         private String name;
         private String op;
         private String retryTime;
-        private String deliveryTime;
+        @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+        private Date deliveryTime;
         private String uuid;
 
         private FileJob fileJob;
diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
index 7d72113..ebecf00 100755
--- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
+++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/db/TestBerkeleyDBImp.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.agent.db;
 
-import java.util.List;
-
 import org.apache.inlong.agent.AgentBaseTestsHelper;
 import org.apache.inlong.common.db.CommandEntity;
 import org.junit.AfterClass;
@@ -26,6 +24,9 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Date;
+import java.util.List;
+
 public class TestBerkeleyDBImp {
 
     private static BerkeleyDbImp db;
@@ -38,7 +39,7 @@ public class TestBerkeleyDBImp {
     }
 
     @AfterClass
-    public static void teardown() throws Exception {
+    public static void teardown() {
         db.close();
         helper.teardownAgentHome();
     }
@@ -80,7 +81,7 @@ public class TestBerkeleyDBImp {
 
     @Test
     public void testCommandDb() {
-        CommandEntity commandEntity = new CommandEntity("1", 0, false, 1, "");
+        CommandEntity commandEntity = new CommandEntity("1", 0, false, 1, new Date());
         db.putCommand(commandEntity);
         CommandEntity command = db.getCommand("1");
         Assert.assertEquals("1", command.getId());
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
similarity index 79%
copy from inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java
copy to inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
index 4f2d0d3..0c2714a 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.pojo.agent;
+package org.apache.inlong.common.constant;
 
-import lombok.Data;
+/**
+ * The constants class
+ */
+public class Constants {
+
+    public static final int RESULT_SUCCESS = 0;
+
+    public static final int RESULT_FAIL = 1;
 
-@Data
-public class CmdConfig {
-    private String dataTime;
-    private Integer id;
-    private Integer op;
-    private Integer taskId;
 }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java b/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
index 2ee676a..f9cf516 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/db/CommandEntity.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.common.db;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import com.sleepycat.persist.model.Entity;
 import com.sleepycat.persist.model.PrimaryKey;
 import com.sleepycat.persist.model.Relationship;
@@ -25,6 +26,8 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.util.Date;
+
 @Entity(version = 1)
 @Data
 @AllArgsConstructor
@@ -37,7 +40,12 @@ public class CommandEntity {
     @SecondaryKey(relate = Relationship.MANY_TO_ONE)
     private boolean isAcked;
     private Integer taskId;
-    private String deliveryTime;
+
+    /**
+     * The task delivery time
+     */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date deliveryTime;
 
     public static String generateCommandId(String taskId, int opType) {
         return taskId + opType;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java
index 4f2d0d3..d29b350 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/CmdConfig.java
@@ -19,10 +19,15 @@ package org.apache.inlong.common.pojo.agent;
 
 import lombok.Data;
 
+/**
+ * The specifically command config for agent.
+ */
 @Data
 public class CmdConfig {
+
     private String dataTime;
     private Integer id;
     private Integer op;
     private Integer taskId;
+
 }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
index aed1252..928e8ca 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/DataConfig.java
@@ -17,22 +17,29 @@
 
 package org.apache.inlong.common.pojo.agent;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import lombok.Data;
 
+import java.util.Date;
+
+/**
+ * The task config for agent.
+ */
 @Data
 public class DataConfig {
 
+    private String ip;
+    private String uuid;
     private String inlongGroupId;
     private String inlongStreamId;
-    private String deliveryTime;
-    private String uuid;
-    private String ip;
     private String op;
     private Integer jobId;
     private Integer taskType;
     private String snapshot;
     private String syncSend;
     private String extParams;
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date deliveryTime;
 
     public boolean isValid() {
         return true;
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
index cc86605..fdc994a 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskRequest.java
@@ -23,11 +23,16 @@ import org.apache.inlong.common.db.CommandEntity;
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Request task from agent to manager.
+ */
 @Data
 public class TaskRequest {
 
     private String agentIp;
+
     private String uuid;
+
     private List<CommandEntity> commandInfo = new ArrayList<>();
 
 }
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
index a4b1e35..bb746a2 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskResult.java
@@ -22,6 +22,9 @@ import lombok.Data;
 
 import java.util.List;
 
+/**
+ * The task result pulled by the agent from the manager.
+ */
 @Data
 @Builder
 public class TaskResult {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 98e9229..b84538a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -137,7 +137,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
 
     private int initOrUpdateSource(SourceRequest sourceRequest) {
         String sourceType = sourceRequest.getSourceType();
-        if (SourceType.KAFKA.name().equals(sourceType) || SourceType.DB_BINLOG.name().equals(sourceType)) {
+        if (SourceType.KAFKA.name().equals(sourceType) || SourceType.BINLOG.name().equals(sourceType)) {
             List<SourceListResponse> responses = managerClient.listSources(sourceRequest.getInlongGroupId(),
                     sourceRequest.getInlongStreamId(), sourceRequest.getSourceType());
             if (CollectionUtils.isEmpty(responses)) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index 0b3743a..96944e2 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.common.enums.SourceType;
 public class MySQLBinlogSource extends StreamSource {
 
     @ApiModelProperty(value = "DataSource type", required = true)
-    private SourceType sourceType = SourceType.DB_BINLOG;
+    private SourceType sourceType = SourceType.BINLOG;
 
     @ApiModelProperty("SyncType for MySQL")
     private SyncType syncType;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
index 2846154..d012a8f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
@@ -21,19 +21,21 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.StreamSource;
 import org.apache.inlong.manager.common.enums.SourceType;
 
 @Data
+@EqualsAndHashCode(callSuper = true)
 @AllArgsConstructor
 @NoArgsConstructor
 @ApiModel("Base configuration for MySQL collection")
 public class MySQLSource extends StreamSource {
 
     @ApiModelProperty(value = "DataSource type", required = true)
-    private SourceType sourceType = SourceType.DB_SQL;
+    private SourceType sourceType = SourceType.SQL;
 
     @ApiModelProperty("SyncType for MySQL")
     private SyncType syncType;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 6ccfdf4..c11264b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -41,7 +41,7 @@ import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 
 import java.util.List;
 
-import static org.apache.inlong.manager.common.enums.SourceType.DB_BINLOG;
+import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
 import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;
 
 /**
@@ -57,8 +57,7 @@ public class InlongParser {
     public static WorkflowResult parseWorkflowResult(Response response) {
         Object data = response.getData();
         String resultData = GsonUtil.toJson(data);
-        WorkflowResult workflowResult = GsonUtil.fromJson(resultData, WorkflowResult.class);
-        return workflowResult;
+        return GsonUtil.fromJson(resultData, WorkflowResult.class);
     }
 
     public static InlongGroupResponse parseGroupInfo(Response response) {
@@ -69,25 +68,22 @@ public class InlongParser {
     public static PageInfo<InlongGroupListResponse> parseGroupList(Response response) {
         Object data = response.getData();
         String pageInfoJson = GsonUtil.toJson(data);
-        PageInfo<InlongGroupListResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
+        return GsonUtil.fromJson(pageInfoJson,
                 new TypeToken<PageInfo<InlongGroupListResponse>>() {
                 }.getType());
-        return pageInfo;
     }
 
     public static InlongStreamInfo parseStreamInfo(Response response) {
         Object data = response.getData();
-        InlongStreamInfo streamInfo = GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamInfo.class);
-        return streamInfo;
+        return GsonUtil.fromJson(GsonUtil.toJson(data), InlongStreamInfo.class);
     }
 
     public static PageInfo<FullStreamResponse> parseStreamList(Response response) {
         Object data = response.getData();
         String pageInfoJson = GsonUtil.toJson(data);
-        PageInfo<FullStreamResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
+        return GsonUtil.fromJson(pageInfoJson,
                 new TypeToken<PageInfo<FullStreamResponse>>() {
                 }.getType());
-        return pageInfo;
     }
 
     public static PageInfo<SourceListResponse> parseSourceList(Response response) {
@@ -99,7 +95,7 @@ public class InlongParser {
         if (pageInfo.getList() != null && !pageInfo.getList().isEmpty()) {
             SourceListResponse sourceListResponse = pageInfo.getList().get(0);
             SourceType sourceType = SourceType.forType(sourceListResponse.getSourceType());
-            if (sourceType == DB_BINLOG) {
+            if (sourceType == BINLOG) {
                 return GsonUtil.fromJson(pageInfoJson,
                         new TypeToken<PageInfo<BinlogSourceListResponse>>() {
                         }.getType());
@@ -119,10 +115,9 @@ public class InlongParser {
     public static PageInfo<SinkListResponse> parseSinkList(Response response) {
         Object data = response.getData();
         String pageInfoJson = GsonUtil.toJson(data);
-        PageInfo<SinkListResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
+        return GsonUtil.fromJson(pageInfoJson,
                 new TypeToken<PageInfo<SinkListResponse>>() {
                 }.getType());
-        return pageInfo;
     }
 
     public static Pair<InlongGroupApproveRequest, List<InlongStreamApproveRequest>> parseGroupForm(String formJson) {
@@ -156,9 +151,9 @@ public class InlongParser {
     public static PageInfo<EventLogView> parseEventLogViewList(Response response) {
         Object data = response.getData();
         String pageInfoJson = GsonUtil.toJson(data);
-        PageInfo<EventLogView> pageInfo = GsonUtil.fromJson(pageInfoJson,
+        return GsonUtil.fromJson(pageInfoJson,
                 new TypeToken<PageInfo<EventLogView>>() {
                 }.getType());
-        return pageInfo;
     }
+
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index f03309c..fda2efb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -41,7 +41,7 @@ public class InlongStreamSourceTransfer {
         switch (sourceType) {
             case KAFKA:
                 return createKafkaSourceRequest((KafkaSource) streamSource, streamInfo);
-            case DB_BINLOG:
+            case BINLOG:
                 return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
             default:
                 throw new RuntimeException(String.format("Unsupport source=%s for Inlong", sourceType));
@@ -54,7 +54,7 @@ public class InlongStreamSourceTransfer {
         if (sourceType == SourceType.KAFKA && sourceListResponse instanceof KafkaSourceListResponse) {
             return parseKafkaSource((KafkaSourceListResponse) sourceListResponse);
         }
-        if (sourceType == SourceType.DB_BINLOG && sourceListResponse instanceof BinlogSourceListResponse) {
+        if (sourceType == SourceType.BINLOG && sourceListResponse instanceof BinlogSourceListResponse) {
             return parseMySQLBinlogSource((BinlogSourceListResponse) sourceListResponse);
         }
         throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index c08ef0f..f950a93 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -26,9 +26,9 @@ public class Constant {
 
     public static final String SOURCE_FILE = "FILE";
 
-    public static final String SOURCE_DB_SQL = "DB_SQL";
+    public static final String SOURCE_SQL = "SQL";
 
-    public static final String SOURCE_DB_BINLOG = "DB_BINLOG";
+    public static final String SOURCE_BINLOG = "BINLOG";
 
     public static final String SOURCE_KAFKA = "KAFKA";
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 99ac6e2..93998e7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -17,15 +17,16 @@
 
 package org.apache.inlong.manager.common.enums;
 
-import java.util.Locale;
 import lombok.Getter;
 import org.apache.inlong.common.enums.TaskTypeEnum;
 
+import java.util.Locale;
+
 public enum SourceType {
     DATABASE_MIGRATION("DATABASE_MIGRATION",TaskTypeEnum.DATABASE_MIGRATION),
     FILE("FILE", TaskTypeEnum.FILE),
-    DB_SQL("DB_SQL", TaskTypeEnum.SQL),
-    DB_BINLOG("DB_BINLOG", TaskTypeEnum.BINLOG),
+    SQL("SQL", TaskTypeEnum.SQL),
+    BINLOG("BINLOG", TaskTypeEnum.BINLOG),
     KAFKA("KAFKA", TaskTypeEnum.KAFKA);
 
     @Getter
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/agent/FileAgentCommandInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/agent/FileAgentCommandInfo.java
index f406ad3..815e842 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/agent/FileAgentCommandInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/agent/FileAgentCommandInfo.java
@@ -17,11 +17,14 @@
 
 package org.apache.inlong.manager.common.pojo.agent;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.List;
 import lombok.Data;
 
+import java.util.Date;
+import java.util.List;
+
 /**
  * File agent operation result
  */
@@ -43,7 +46,8 @@ public class FileAgentCommandInfo {
         private int commandResult;
 
         @ApiModelProperty(value = "Command issuance time")
-        private long deliveryTime;
+        @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+        private Date deliveryTime;
 
         @ApiModelProperty(value = "task id")
         private int taskId;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
index 8c853f0..da9b968 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceRequest.java
@@ -34,11 +34,11 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
 @ApiModel(value = "Request of the binlog source info")
-@JsonTypeDefine(value = Constant.SOURCE_DB_BINLOG)
+@JsonTypeDefine(value = Constant.SOURCE_BINLOG)
 public class BinlogSourceRequest extends SourceRequest {
 
     public BinlogSourceRequest() {
-        this.setSourceType(SourceType.DB_BINLOG.toString());
+        this.setSourceType(SourceType.BINLOG.toString());
     }
 
     @ApiModelProperty("Username of the DB server")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index a6e2fa5..49388fe 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 @ApiModel(value = "Response of the binlog source")
 public class BinlogSourceResponse extends SourceResponse {
 
-    private String sourceType = Constant.SOURCE_DB_BINLOG;
+    private String sourceType = Constant.SOURCE_BINLOG;
 
     @ApiModelProperty("Username of the DB server")
     private String user;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 725dd2a..cb49a05 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.dao.mapper;
 
 import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.springframework.stereotype.Repository;
@@ -52,13 +51,13 @@ public interface StreamSourceEntityMapper {
     List<StreamSourceEntity> selectByCondition(@Param("request") SourcePageRequest request);
 
     /**
-     * According to the inlong group id and inlong stream id, query valid source information
+     * Query valid source list by the given group id and stream id.
      *
-     * @param groupId inlong group id
-     * @param streamId inlong stream id
-     * @return Source entity list
+     * @param groupId Inlong group id.
+     * @param streamId Inlong stream id.
+     * @return Source entity list.
      */
-    List<StreamSourceEntity> selectByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
+    List<StreamSourceEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId);
 
     /**
      * According to the group id, stream id and source type, query valid source entity list.
@@ -72,6 +71,11 @@ public interface StreamSourceEntityMapper {
             @Param("sourceType") String sourceType);
 
     /**
+     * Query the source list by the given agent ip and agent uuid.
+     */
+    List<StreamSourceEntity> selectByIpAndUuid(@Param("agentIp") String agentIp, @Param("uuid") String uuid);
+
+    /**
      * Get the distinct source type from the given groupId and streamId
      */
     List<String> selectSourceType(@Param("groupId") String groupId, @Param("streamId") String streamId);
@@ -101,6 +105,4 @@ public interface StreamSourceEntityMapper {
 
     int deleteByPrimaryKey(Integer id);
 
-    List<StreamSourceEntity> selectAgentTaskDataConfig(TaskRequest taskRequest);
-
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 6e58818..80b206f 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -42,18 +42,6 @@
         <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
         <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
     </resultMap>
-    <resultMap id="DataConfig" type="org.apache.inlong.common.pojo.agent.DataConfig">
-        <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
-        <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
-        <result column="modify_time" jdbcType="VARCHAR" property="deliveryTime"/>
-        <result column="uuid" jdbcType="VARCHAR" property="uuid"/>
-        <result column="agent_ip" jdbcType="VARCHAR" property="ip"/>
-        <result column="op" jdbcType="VARCHAR" property="op"/>
-        <result column="id" jdbcType="VARCHAR" property="taskId"/>
-        <result column="source_type" jdbcType="INTEGER" property="taskType"/>
-        <result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
-        <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
-    </resultMap>
     <sql id="Base_Column_List">
         id, inlong_group_id, inlong_stream_id, source_type, agent_ip, uuid, server_id, server_name,
         cluster_id, cluster_name, snapshot, report_time, ext_params, status, previous_status,
@@ -232,13 +220,13 @@
         from stream_source
         <where>
             is_deleted = 0
-            <if test="request.sourceType != null and request.sourceType != ''">
-                and source_type = #{request.sourceType, jdbcType=VARCHAR}
-            </if>
             and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
             <if test="request.inlongStreamId != null and request.inlongStreamId != ''">
                 and inlong_stream_id = #{request.inlongStreamId, jdbcType=VARCHAR}
             </if>
+            <if test="request.sourceType != null and request.sourceType != ''">
+                and source_type = #{request.sourceType, jdbcType=VARCHAR}
+            </if>
             <if test="request.keyWord != null and request.keyWord != ''">
                 and (
                 inlong_group_id like CONCAT('%', #{request.keyWord}, '%')
@@ -251,7 +239,7 @@
             order by modify_time desc
         </where>
     </select>
-    <select id="selectByIdentifier" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+    <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         select
         <include refid="Base_Column_List"/>
         from stream_source
@@ -278,6 +266,18 @@
             </if>
         </where>
     </select>
+    <select id="selectByIpAndUuid" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source
+        <where>
+            is_deleted = 0
+            and agent_ip = #{agentIp, jdbcType=VARCHAR}
+            <if test="uuid != null and uuid != ''">
+                and uuid = #{uuid, jdbcType=VARCHAR}
+            </if>
+        </where>
+    </select>
     <select id="selectSourceType" resultType="java.lang.String">
         select distinct (source_type)
         from stream_source
@@ -410,29 +410,4 @@
         from stream_source
         where id = #{id,jdbcType=INTEGER}
     </delete>
-
-    <select id="selectAgentTaskDataConfig" parameterType="org.apache.inlong.common.pojo.agent.TaskRequest"
-            resultMap="DataConfig">
-        select
-        detail.inlong_group_id,
-        detail.inlong_stream_id,
-        detail.modify_time,
-        detail.uuid,
-        detail.agent_ip,
-        detail.status,
-        detail.id,
-        detail.source_type,
-        detail.snapshot,
-        detail.ext_params
-        from stream_source detail
-        where detail.is_deleted = 0
-        <if test="agentIp != null and agentIp != ''">
-            and detail.agent_ip = #{agentIp, jdbcType=VARCHAR}
-        </if>
-        <if test="uuid != null and uuid != ''">
-            and detail.uuid = #{uuid, jdbcType=VARCHAR}
-        </if>
-        and (floor(detail.STATUS / 100) = 2)
-    </select>
-
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentTaskService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
similarity index 75%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentTaskService.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
index dbdf36f..35130a6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentTaskService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AgentService.java
@@ -17,9 +17,9 @@
 
 package org.apache.inlong.manager.service.core;
 
-import java.util.List;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.pojo.agent.AgentStatusReportRequest;
 import org.apache.inlong.manager.common.pojo.agent.CheckAgentTaskConfRequest;
 import org.apache.inlong.manager.common.pojo.agent.ConfirmAgentIpRequest;
@@ -27,9 +27,28 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskConfig;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
 
-public interface AgentTaskService {
+import java.util.List;
 
-    TaskResult getAgentTask(TaskRequest taskRequest);
+/**
+ * The service interface for agent
+ */
+public interface AgentService {
+
+    /**
+     * Report the heartbeat for given source.
+     *
+     * @param request Heartbeat request.
+     * @return Whether succeed.
+     */
+    Boolean reportSnapshot(TaskSnapshotRequest request);
+
+    /**
+     * Agent report the task result, and pull task config to operate.
+     *
+     * @param request Request of the task result.
+     * @return Task result.
+     */
+    TaskResult reportAndGetTask(TaskRequest request);
 
     @Deprecated
     FileAgentTaskInfo getFileAgentTask(FileAgentCommandInfo info);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
similarity index 77%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 02740a3..5b1993c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -19,20 +19,17 @@ package org.apache.inlong.manager.service.core.impl;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.db.CommandEntity;
 import org.apache.inlong.common.pojo.agent.CmdConfig;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.FileAgentDataGenerateRule;
+import org.apache.inlong.manager.common.enums.SourceState;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.agent.AgentStatusReportRequest;
 import org.apache.inlong.manager.common.pojo.agent.CheckAgentTaskConfRequest;
@@ -50,64 +47,135 @@ import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
 import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
-import org.apache.inlong.manager.service.core.AgentTaskService;
+import org.apache.inlong.manager.service.core.AgentService;
+import org.apache.inlong.manager.service.source.SourceSnapshotOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 @Service
-@Slf4j
-public class AgentTaskServiceImpl implements AgentTaskService {
+public class AgentServiceImpl implements AgentService {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(AgentTaskServiceImpl.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(AgentServiceImpl.class);
+    private static final int UNISSUED_STATUS = 2;
+    private static final int ISSUED_STATUS = 3;
 
     @Autowired
+    private StreamSourceEntityMapper sourceMapper;
+    @Autowired
+    private SourceSnapshotOperation snapshotOperation;
+    @Autowired
     private SourceFileDetailEntityMapper fileDetailMapper;
-
     @Autowired
     private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;
-
-    @Autowired
-    private StreamSourceEntityMapper streamSourceMapper;
-
     @Autowired
     private InlongStreamFieldEntityMapper streamFieldMapper;
 
+    /**
+     * If the reported task time and the modification time in the database exceed this value,
+     * it will be considered that the user has modified the task, and the result of this report will be ignored.
+     */
+    @Value("${stream.source.maxModifyTime:5000}")
+    private Integer maxModifyTime;
+
+    @Override
+    public Boolean reportSnapshot(TaskSnapshotRequest request) {
+        return snapshotOperation.snapshot(request);
+    }
+
     @Override
-    public TaskResult getAgentTask(TaskRequest taskRequest) {
-        LOGGER.debug("begin to get agent task by taskRequestDto={}", taskRequest);
-        if (taskRequest == null || taskRequest.getAgentIp() == null) {
-            LOGGER.error("agent command taskRequestDto cannot be empty");
+    public TaskResult reportAndGetTask(TaskRequest request) {
+        LOGGER.debug("begin to get agent task: {}", request);
+        if (request == null || request.getAgentIp() == null) {
+            LOGGER.warn("agent request was empty, just return");
             return null;
         }
-        // Query pending tasks by agentIp
-        List<DataConfig> dataConfigs = getAgentDataConfigs(taskRequest);
 
-        // Query pending special commands
-        List<CmdConfig> cmdConfigs = getAgentCmdConfigs(taskRequest);
+        this.updateTaskStatus(request);
 
-        return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
+        return this.getTaskResult(request);
     }
 
-    private List<DataConfig> getAgentDataConfigs(TaskRequest taskRequest) {
-        List<StreamSourceEntity> sourceEntities = streamSourceMapper.selectAgentTaskDataConfig(taskRequest);
-        List<DataConfig> dataConfigs = sourceEntities.stream().map(sourceEntity -> {
+    /**
+     * Get task result by the request
+     */
+    private TaskResult getTaskResult(TaskRequest request) {
+        // Query all tasks with status in 20x
+        String agentIp = request.getAgentIp();
+        String uuid = request.getUuid();
+        List<StreamSourceEntity> entityList = sourceMapper.selectByIpAndUuid(agentIp, uuid);
+
+        List<DataConfig> dataConfigs = entityList.stream().map(entity -> {
             DataConfig dataConfig = new DataConfig();
-            dataConfig.setOp(String.valueOf(sourceEntity.getStatus() % 100));
-            dataConfig.setJobId(sourceEntity.getId());
-            SourceType sourceType = SourceType.forType(sourceEntity.getSourceType());
+            dataConfig.setJobId(entity.getId());
+            SourceType sourceType = SourceType.forType(entity.getSourceType());
             dataConfig.setTaskType(sourceType.getTaskType().getType());
-            dataConfig.setInlongGroupId(sourceEntity.getInlongGroupId());
-            dataConfig.setInlongStreamId(sourceEntity.getInlongStreamId());
-            dataConfig.setIp(sourceEntity.getAgentIp());
-            dataConfig.setUuid(sourceEntity.getUuid());
-            dataConfig.setExtParams(sourceEntity.getExtParams());
-            dataConfig.setSnapshot(sourceEntity.getSnapshot());
+            dataConfig.setOp(String.valueOf(entity.getStatus() % 100));
+            dataConfig.setInlongGroupId(entity.getInlongGroupId());
+            dataConfig.setInlongStreamId(entity.getInlongStreamId());
+            dataConfig.setIp(entity.getAgentIp());
+            dataConfig.setUuid(entity.getUuid());
+            dataConfig.setExtParams(entity.getExtParams());
+            dataConfig.setSnapshot(entity.getSnapshot());
+
             return dataConfig;
         }).collect(Collectors.toList());
-        //Forward Compatible File task type
-        return dataConfigs;
+
+        // Query pending special commands
+        List<CmdConfig> cmdConfigs = getAgentCmdConfigs(request);
+
+        return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
+    }
+
+    /**
+     * Update the task status by the request
+     */
+    private void updateTaskStatus(TaskRequest request) {
+        if (CollectionUtils.isEmpty(request.getCommandInfo())) {
+            LOGGER.warn("task result was empty, just return");
+            return;
+        }
+
+        for (CommandEntity command : request.getCommandInfo()) {
+            Integer taskId = command.getTaskId();
+            StreamSourceEntity current = sourceMapper.selectByPrimaryKey(taskId);
+            if (current == null) {
+                continue;
+            }
+
+            if (current.getModifyTime().getTime() - command.getDeliveryTime().getTime() > maxModifyTime) {
+                LOGGER.warn("task {} receive result delay more than {} ms, skip it", taskId, maxModifyTime);
+                continue;
+            }
+
+            int result = command.getCommandResult();
+            int previousStatus = current.getStatus();
+            int nextStatus = SourceState.SOURCE_NORMAL.getCode();
+            if (previousStatus / 100 == UNISSUED_STATUS) {
+                if (Constants.RESULT_SUCCESS == result) {
+                    if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
+                        nextStatus = SourceState.SOURCE_NORMAL.getCode();
+                    } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
+                        nextStatus = SourceState.SOURCE_DISABLE.getCode();
+                    } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
+                        nextStatus = SourceState.SOURCE_FROZEN.getCode();
+                    }
+                } else if (Constants.RESULT_FAIL == result) {
+                    nextStatus = SourceState.SOURCE_FAILED.getCode();
+                }
+
+                sourceMapper.updateStatus(taskId, nextStatus);
+            }
+        }
     }
 
     private List<CmdConfig> getAgentCmdConfigs(TaskRequest taskRequest) {
@@ -214,8 +282,8 @@ public class AgentTaskServiceImpl implements AgentTaskService {
                 }
 
             } else { // Modify the result status of the data collection task
-                if (current.getModifyTime().getTime() - command.getDeliveryTime() > 1000 * 5) {
-                    log.warn(" task id {} receive heartbeat time delay more than 5's, skip it!",
+                if (current.getModifyTime().getTime() - command.getDeliveryTime().getTime() > 1000 * 5) {
+                    LOGGER.warn(" task id {} receive heartbeat time delay more than 5's, skip it!",
                             command.getTaskId());
                     continue;
                 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
index 39c6892..e9fadfe 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
@@ -170,7 +170,7 @@ public class SourceSnapshotOperation implements AutoCloseable {
      * @see org.apache.inlong.manager.common.enums.SourceState
      */
     private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Integer>> getTaskIpAndStatusMap() {
-        ConcurrentHashMap<String, ConcurrentHashMap<Integer, Integer>> ipTaskMap = new ConcurrentHashMap<>();
+        ConcurrentHashMap<String, ConcurrentHashMap<Integer, Integer>> ipTaskMap = new ConcurrentHashMap<>(16);
         List<StreamSourceEntity> sourceList = sourceMapper.selectTempStatusSource();
         for (StreamSourceEntity entity : sourceList) {
             String ip = entity.getAgentIp();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 190b049..5f876b3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.service.source;
 
 import com.github.pagehelper.PageInfo;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -174,12 +173,4 @@ public interface StreamSourceService {
         return true;
     }
 
-    /**
-     * Report the heartbeat for given source.
-     *
-     * @param request Heartbeat request.
-     * @return Whether succeed.
-     */
-    Boolean reportSnapshot(TaskSnapshotRequest request);
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index c32c0f5..c326274 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -23,7 +23,6 @@ import com.github.pagehelper.PageInfo;
 import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
@@ -66,8 +65,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     private StreamSourceEntityMapper sourceMapper;
     @Autowired
     private CommonOperateService commonOperateService;
-    @Autowired
-    private SourceSnapshotOperation heartbeatOperation;
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
@@ -111,7 +108,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         LOGGER.debug("begin to list source by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
 
-        List<StreamSourceEntity> entityList = sourceMapper.selectByIdentifier(groupId, streamId);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
         if (CollectionUtils.isEmpty(entityList)) {
             return Collections.emptyList();
         }
@@ -138,7 +135,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
             SourceType sourceType = SourceType.forType(entity.getSourceType());
             StreamSourceOperation operation = operationFactory.getInstance(sourceType);
             switch (sourceType) {
-                case DB_BINLOG:
+                case BINLOG:
                     BinlogSourceListResponse binlogSourceListResponse = operation.getFromEntity(entity,
                             BinlogSourceListResponse::new);
                     responses.add(binlogSourceListResponse);
@@ -263,7 +260,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
             nextStatus = SourceState.SOURCE_DISABLE.getCode();
         }
         Date now = new Date();
-        List<StreamSourceEntity> entityList = sourceMapper.selectByIdentifier(groupId, streamId);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
         if (CollectionUtils.isNotEmpty(entityList)) {
             for (StreamSourceEntity entity : entityList) {
                 Integer id = entity.getId();
@@ -291,7 +288,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         // Check if it can be deleted
         commonOperateService.checkGroupStatus(groupId, operator);
 
-        List<StreamSourceEntity> entityList = sourceMapper.selectByIdentifier(groupId, streamId);
+        List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId);
         if (CollectionUtils.isNotEmpty(entityList)) {
             entityList.forEach(entity -> sourceMapper.deleteByPrimaryKey(entity.getId()));
         }
@@ -312,11 +309,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         return resultList;
     }
 
-    @Override
-    public Boolean reportSnapshot(TaskSnapshotRequest request) {
-        return heartbeatOperation.snapshot(request);
-    }
-
     private void checkParams(SourceRequest request) {
         Preconditions.checkNotNull(request, Constant.REQUEST_IS_EMPTY);
         String groupId = request.getInlongGroupId();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
index d0966e3..6531900 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogStreamSourceOperation.java
@@ -52,12 +52,12 @@ public class BinlogStreamSourceOperation extends AbstractStreamSourceOperation {
 
     @Override
     public Boolean accept(SourceType sourceType) {
-        return SourceType.DB_BINLOG == sourceType;
+        return SourceType.BINLOG == sourceType;
     }
 
     @Override
     protected String getSourceType() {
-        return Constant.SOURCE_DB_BINLOG;
+        return Constant.SOURCE_BINLOG;
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
index 610cb0c..d5f3e4b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/listener/AbstractSourceOperateListener.java
@@ -81,7 +81,7 @@ public abstract class AbstractSourceOperateListener implements DataSourceOperate
         String sourceType = sourceResponse.getSourceType();
         SourceType type = SourceType.valueOf(sourceType);
         switch (type) {
-            case DB_BINLOG:
+            case BINLOG:
                 return CommonBeanUtils.copyProperties((BinlogSourceResponse) sourceResponse, BinlogSourceRequest::new);
             case KAFKA:
                 return CommonBeanUtils.copyProperties((KafkaSourceResponse) sourceResponse, KafkaSourceRequest::new);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
index 124fb62..345d5b4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SerializationUtils.java
@@ -44,7 +44,7 @@ public class SerializationUtils {
             InlongStreamInfo streamInfo) {
         SourceType sourceType = SourceType.forType(sourceResponse.getSourceType());
         switch (sourceType) {
-            case DB_BINLOG:
+            case BINLOG:
                 return forBinlog((BinlogSourceResponse) sourceResponse, streamInfo);
             case KAFKA:
                 return forKafka((KafkaSourceResponse) sourceResponse, streamInfo);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index 7f6cb24..5022d48 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -25,4 +25,8 @@ import org.springframework.boot.test.context.SpringBootTest;
 @SpringBootTest(classes = ServiceBaseTest.class)
 public class ServiceBaseTest extends BaseTest {
 
+    public final String globalGroupId = "b_group1";
+    public final String globalStreamId = "stream1";
+    public final String globalOperator = "admin";
+
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
similarity index 54%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
copy to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index e4cb65b..003936b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -15,17 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core.source;
+package org.apache.inlong.manager.service.core.impl;
 
 import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
 import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.enums.Constant;
-import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
-import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.core.AgentService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.junit.Assert;
 import org.junit.Test;
@@ -37,15 +34,13 @@ import java.util.Date;
 /**
  * Stream source service test
  */
-public class StreamSourceServiceTest extends ServiceBaseTest {
-
-    private final String globalGroupId = "b_group1";
-    private final String globalStreamId = "stream1";
-    private final String globalOperator = "test_user";
+public class AgentServiceTest extends ServiceBaseTest {
 
     @Autowired
     private StreamSourceService sourceService;
     @Autowired
+    private AgentService agentService;
+    @Autowired
     private InlongStreamServiceTest streamServiceTest;
 
     public Integer saveSource() {
@@ -54,46 +49,12 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         BinlogSourceRequest sourceInfo = new BinlogSourceRequest();
         sourceInfo.setInlongGroupId(globalGroupId);
         sourceInfo.setInlongStreamId(globalStreamId);
-        sourceInfo.setSourceType(Constant.SOURCE_DB_BINLOG);
+        sourceInfo.setSourceType(Constant.SOURCE_BINLOG);
 
         return sourceService.save(sourceInfo, globalOperator);
     }
 
     @Test
-    public void testSaveAndDelete() {
-        Integer id = this.saveSource();
-        Assert.assertNotNull(id);
-
-        boolean result = sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
-        Assert.assertTrue(result);
-    }
-
-    @Test
-    public void testListByIdentifier() {
-        Integer id = this.saveSource();
-
-        SourceResponse source = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
-        Assert.assertEquals(globalGroupId, source.getInlongGroupId());
-
-        sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
-    }
-
-    @Test
-    public void testGetAndUpdate() {
-        Integer id = this.saveSource();
-        SourceResponse response = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
-        Assert.assertEquals(globalGroupId, response.getInlongGroupId());
-
-        BinlogSourceResponse binlogResponse = (BinlogSourceResponse) response;
-
-        BinlogSourceRequest request = CommonBeanUtils.copyProperties(binlogResponse, BinlogSourceRequest::new);
-        boolean result = sourceService.update(request, globalOperator);
-        Assert.assertTrue(result);
-
-        sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
-    }
-
-    @Test
     public void testReportSnapshot() {
         Integer id = this.saveSource();
 
@@ -106,10 +67,10 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         message.setSnapshot("{\"offset\": 100}");
         request.setSnapshotList(Collections.singletonList(message));
 
-        Boolean result = sourceService.reportSnapshot(request);
+        Boolean result = agentService.reportSnapshot(request);
         Assert.assertTrue(result);
 
-        sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
+        sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
     }
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
index e4cb65b..1f6588a 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.inlong.manager.service.core.source;
 
-import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
@@ -31,18 +29,11 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import java.util.Collections;
-import java.util.Date;
-
 /**
  * Stream source service test
  */
 public class StreamSourceServiceTest extends ServiceBaseTest {
 
-    private final String globalGroupId = "b_group1";
-    private final String globalStreamId = "stream1";
-    private final String globalOperator = "test_user";
-
     @Autowired
     private StreamSourceService sourceService;
     @Autowired
@@ -54,7 +45,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         BinlogSourceRequest sourceInfo = new BinlogSourceRequest();
         sourceInfo.setInlongGroupId(globalGroupId);
         sourceInfo.setInlongStreamId(globalStreamId);
-        sourceInfo.setSourceType(Constant.SOURCE_DB_BINLOG);
+        sourceInfo.setSourceType(Constant.SOURCE_BINLOG);
 
         return sourceService.save(sourceInfo, globalOperator);
     }
@@ -64,7 +55,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         Integer id = this.saveSource();
         Assert.assertNotNull(id);
 
-        boolean result = sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
+        boolean result = sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
         Assert.assertTrue(result);
     }
 
@@ -72,16 +63,16 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
     public void testListByIdentifier() {
         Integer id = this.saveSource();
 
-        SourceResponse source = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
+        SourceResponse source = sourceService.get(id, Constant.SOURCE_BINLOG);
         Assert.assertEquals(globalGroupId, source.getInlongGroupId());
 
-        sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
+        sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
     }
 
     @Test
     public void testGetAndUpdate() {
         Integer id = this.saveSource();
-        SourceResponse response = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
+        SourceResponse response = sourceService.get(id, Constant.SOURCE_BINLOG);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         BinlogSourceResponse binlogResponse = (BinlogSourceResponse) response;
@@ -90,26 +81,7 @@ public class StreamSourceServiceTest extends ServiceBaseTest {
         boolean result = sourceService.update(request, globalOperator);
         Assert.assertTrue(result);
 
-        sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
-    }
-
-    @Test
-    public void testReportSnapshot() {
-        Integer id = this.saveSource();
-
-        TaskSnapshotRequest request = new TaskSnapshotRequest();
-        request.setAgentIp("127.0.0.1");
-        request.setReportTime(new Date());
-
-        TaskSnapshotMessage message = new TaskSnapshotMessage();
-        message.setJobId(id);
-        message.setSnapshot("{\"offset\": 100}");
-        request.setSnapshotList(Collections.singletonList(message));
-
-        Boolean result = sourceService.reportSnapshot(request);
-        Assert.assertTrue(result);
-
-        sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
+        sourceService.delete(id, Constant.SOURCE_BINLOG, globalOperator);
     }
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
index 2f49ce4..7d85a03 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
@@ -80,7 +80,7 @@ public class DataSourceListenerTest extends WorkflowServiceImplTest {
         WorkflowProcess process = context.getProcess();
         WorkflowTask task = process.getTaskByName("stopSource");
         Assert.assertTrue(task instanceof ServiceTask);
-        SourceResponse sourceResponse = streamSourceService.get(sourceId, SourceType.DB_BINLOG.toString());
+        SourceResponse sourceResponse = streamSourceService.get(sourceId, SourceType.BINLOG.toString());
         Assert.assertSame(SourceState.forCode(sourceResponse.getStatus()), SourceState.TO_BE_ISSUED_FROZEN);
     }
 
@@ -109,7 +109,7 @@ public class DataSourceListenerTest extends WorkflowServiceImplTest {
         WorkflowProcess process = context.getProcess();
         WorkflowTask task = process.getTaskByName("restartSource");
         Assert.assertTrue(task instanceof ServiceTask);
-        SourceResponse sourceResponse = streamSourceService.get(sourceId, SourceType.DB_BINLOG.toString());
+        SourceResponse sourceResponse = streamSourceService.get(sourceId, SourceType.BINLOG.toString());
         Assert.assertSame(SourceState.forCode(sourceResponse.getStatus()), SourceState.TO_BE_ISSUED_ACTIVE);
     }
 
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index ccef5df..ceb2983 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -19,9 +19,9 @@ package org.apache.inlong.manager.web.controller.openapi;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
-import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.pojo.agent.AgentHeartbeatRequest;
 import org.apache.inlong.manager.common.pojo.agent.AgentStatusReportRequest;
@@ -33,10 +33,9 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskConfig;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
 import org.apache.inlong.manager.service.core.AgentHeartbeatService;
+import org.apache.inlong.manager.service.core.AgentService;
 import org.apache.inlong.manager.service.core.AgentSysConfigService;
-import org.apache.inlong.manager.service.core.AgentTaskService;
 import org.apache.inlong.manager.service.core.ThirdPartyClusterService;
-import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -52,9 +51,7 @@ import java.util.List;
 public class AgentController {
 
     @Autowired
-    private StreamSourceService sourceService;
-    @Autowired
-    private AgentTaskService agentTaskService;
+    private AgentService agentService;
     @Autowired
     private AgentSysConfigService agentSysConfigService;
     @Autowired
@@ -68,29 +65,29 @@ public class AgentController {
         return Response.success(thirdPartyClusterService.listClusterIpByType("inlong-openapi"));
     }
 
-    @PostMapping("/getTask")
-    @ApiOperation(value = "Get source task config")
-    public Response<TaskResult> getTask(@RequestBody TaskRequest taskRequest) {
-        return Response.success(agentTaskService.getAgentTask(taskRequest));
-    }
-
     @PostMapping("/reportSnapshot")
     @ApiOperation(value = "Report source task snapshot")
     public Response<Boolean> reportSnapshot(TaskSnapshotRequest request) {
-        return Response.success(sourceService.reportSnapshot(request));
+        return Response.success(agentService.reportSnapshot(request));
+    }
+
+    @PostMapping("/reportAndGetTask")
+    @ApiOperation(value = "Report source task snapshot")
+    public Response<TaskResult> reportAndGetTask(@RequestBody TaskRequest request) {
+        return Response.success(agentService.reportAndGetTask(request));
     }
 
     @Deprecated
     @PostMapping("/fileAgent/getTaskConf")
     @ApiOperation(value = "Get file task")
     public Response<FileAgentTaskInfo> getFileAgentTask(@RequestBody FileAgentCommandInfo info) {
-        return Response.success(agentTaskService.getFileAgentTask(info));
+        return Response.success(agentService.getFileAgentTask(info));
     }
 
     @PostMapping("/fileAgent/confirmAgentIp")
     @ApiOperation(value = "Confirm current agent ip")
     public Response<String> confirmAgentIp(@RequestBody ConfirmAgentIpRequest info) {
-        return Response.success(agentTaskService.confirmAgentIp(info));
+        return Response.success(agentService.confirmAgentIp(info));
     }
 
     @PostMapping("/fileAgent/getAgentSysConf")
@@ -108,12 +105,13 @@ public class AgentController {
     @PostMapping("/fileAgent/checkAgentTaskConf")
     @ApiOperation(value = "Check agent source config")
     public Response<List<FileAgentTaskConfig>> checkAgentTaskConf(@RequestBody CheckAgentTaskConfRequest info) {
-        return Response.success(agentTaskService.checkAgentTaskConf(info));
+        return Response.success(agentService.checkAgentTaskConf(info));
     }
 
     @PostMapping("/fileAgent/reportAgentStatus")
     @ApiOperation(value = "Report agent status")
     public Response<String> reportAgentStatus(@RequestBody AgentStatusReportRequest info) {
-        return Response.success(agentTaskService.reportAgentStatus(info));
+        return Response.success(agentService.reportAgentStatus(info));
     }
+
 }