You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/24 13:17:59 UTC

[incubator-inlong] branch master updated: [INLONG-2689][Manager] Support report snapshot for source agent (#2706)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f52c336  [INLONG-2689][Manager] Support report snapshot for source agent (#2706)
f52c336 is described below

commit f52c336db35ff195e2d3b209272a758b08e991ce
Author: healchow <he...@gmail.com>
AuthorDate: Thu Feb 24 21:17:55 2022 +0800

    [INLONG-2689][Manager] Support report snapshot for source agent (#2706)
    
    * [INLONG-2689][Manager] Support report snapshot for source agent
    
    * [INLONG-2689][Manager] Add and refactor unit tests
    
    * [INLONG-2689][Manager] Add apache license
---
 .../common/pojo/agent/TaskSnapshotMessage.java     |  24 ++--
 ...rtbeatRequest.java => TaskSnapshotRequest.java} |  20 +--
 .../manager/dao/entity/StreamSourceEntity.java     |   1 +
 .../dao/mapper/StreamSourceEntityMapper.java       |   3 +-
 .../resources/mappers/StreamSourceEntityMapper.xml |  44 ++++---
 .../service/source/SourceSnapshotOperation.java    | 139 +++++++++++++++++++++
 .../service/source/StreamSourceService.java        |   4 +-
 .../service/source/StreamSourceServiceImpl.java    |  14 +--
 .../service/core/impl/AuditServiceTest.java        |   2 -
 .../service/core/impl/InlongGroupServiceTest.java  |   3 +-
 .../service/core/impl/InlongStreamServiceTest.java |   3 +-
 ...rviceTest.java => StreamSourceServiceTest.java} |  38 +++++-
 .../main/resources/sql/apache_inlong_manager.sql   |   3 +-
 .../manager-web/sql/apache_inlong_manager.sql      |   3 +-
 .../web/controller/openapi/AgentController.java    |  28 +++--
 15 files changed, 257 insertions(+), 72 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceSnapshotRequest.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotMessage.java
similarity index 66%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceSnapshotRequest.java
rename to inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotMessage.java
index da6c95c..972176e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceSnapshotRequest.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotMessage.java
@@ -15,26 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.source;
+package org.apache.inlong.common.pojo.agent;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 
-import javax.validation.constraints.NotNull;
-
 /**
- * Request of the source heartbeat
+ * Snapshot message
  */
 @Data
-@ApiModel("Request of the source heartbeat")
-public class SourceSnapshotRequest {
+public class TaskSnapshotMessage {
 
-    @NotNull
-    @ApiModelProperty("Id of the source")
-    private Integer id;
+    /**
+     * The job id
+     */
+    private Integer jobId;
 
-    @ApiModelProperty("snapshot of this source task")
+    /**
+     * Snapshot of this source task
+     */
     private String snapshot;
 
-}
+}
\ No newline at end of file
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskHeartbeatRequest.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java
similarity index 78%
rename from inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskHeartbeatRequest.java
rename to inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java
index ed305e6..9874017 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskHeartbeatRequest.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/TaskSnapshotRequest.java
@@ -20,25 +20,24 @@ package org.apache.inlong.common.pojo.agent;
 import com.fasterxml.jackson.annotation.JsonFormat;
 import lombok.Data;
 
-import javax.annotation.Nonnull;
 import java.util.Date;
+import java.util.List;
 
 /**
- * Request of the agent task heartbeat
+ * Request of the agent task snapshot
  */
 @Data
-public class TaskHeartbeatRequest {
+public class TaskSnapshotRequest {
 
     /**
-     * The source id
+     * The ip of agent
      */
-    @Nonnull
-    private Integer id;
+    private String agentIp;
 
     /**
-     * Heartbeat of this source task
+     * The mac UUID of agent
      */
-    private String heartbeat;
+    private String uuid;
 
     /**
      * Report time
@@ -46,4 +45,9 @@ public class TaskHeartbeatRequest {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date reportTime;
 
+    /**
+     * The snapshot message list
+     */
+    private List<TaskSnapshotMessage> snapshotList;
+
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
index c6238e3..70259d8 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java
@@ -38,6 +38,7 @@ public class StreamSourceEntity implements Serializable {
     private Integer clusterId;
     private String clusterName;
     private String snapshot;
+    private Date reportTime;
 
     // extParams saved filePath, fileRollingType, dbName, tableName, etc.
     private String extParams;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 1666527..eff8215 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.dao.mapper;
 import org.apache.ibatis.annotations.Param;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
-import org.apache.inlong.manager.common.pojo.source.SourceSnapshotRequest;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.springframework.stereotype.Repository;
@@ -84,7 +83,7 @@ public interface StreamSourceEntityMapper {
 
     int updateStatus(StreamSourceEntity entity);
 
-    int updateSnapshot(SourceSnapshotRequest request);
+    int updateSnapshot(StreamSourceEntity entity);
 
     int deleteByPrimaryKey(Integer id);
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 10e6111..77da5cb 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -32,6 +32,7 @@
         <result column="cluster_id" jdbcType="INTEGER" property="clusterId"/>
         <result column="cluster_name" jdbcType="VARCHAR" property="clusterName"/>
         <result column="snapshot" jdbcType="LONGVARCHAR" property="snapshot"/>
+        <result column="report_time" jdbcType="TIMESTAMP" property="reportTime"/>
         <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
         <result column="status" jdbcType="INTEGER" property="status"/>
         <result column="previous_status" jdbcType="INTEGER" property="previousStatus"/>
@@ -55,8 +56,8 @@
     </resultMap>
     <sql id="Base_Column_List">
         id, inlong_group_id, inlong_stream_id, source_type, agent_ip, uuid, server_id, server_name,
-        cluster_id, cluster_name, snapshot, ext_params, status, previous_status, is_deleted,
-        creator, modifier, create_time, modify_time
+        cluster_id, cluster_name, snapshot, report_time, ext_params, status, previous_status,
+        is_deleted, creator, modifier, create_time, modify_time
     </sql>
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
@@ -64,15 +65,15 @@
         insert into stream_source (id, inlong_group_id, inlong_stream_id,
                                    source_type, agent_ip, uuid,
                                    server_id, server_name, cluster_id,
-                                   cluster_name, snapshot, ext_params,
-                                   status, previous_status,
+                                   cluster_name, snapshot, report_time,
+                                   ext_params, status, previous_status,
                                    is_deleted, creator, modifier,
                                    create_time, modify_time)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{sourceType,jdbcType=VARCHAR}, #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR},
                 #{serverId,jdbcType=INTEGER}, #{serverName,jdbcType=VARCHAR}, #{clusterId,jdbcType=INTEGER},
-                #{clusterName,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR}, #{extParams,jdbcType=LONGVARCHAR},
-                #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
+                #{clusterName,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR}, #{modifyTime,jdbcType=TIMESTAMP},
+                #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
                 #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
                 #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
     </insert>
@@ -113,6 +114,12 @@
             <if test="snapshot != null">
                 snapshot,
             </if>
+            <if test="reportTime != null">
+                report_time,
+            </if>
+            <if test="extParams != null">
+                ext_params,
+            </if>
             <if test="status != null">
                 status,
             </if>
@@ -134,9 +141,6 @@
             <if test="modifyTime != null">
                 modify_time,
             </if>
-            <if test="extParams != null">
-                ext_params,
-            </if>
         </trim>
         <trim prefix="values (" suffix=")" suffixOverrides=",">
             <if test="id != null">
@@ -172,6 +176,9 @@
             <if test="snapshot != null">
                 #{snapshot,jdbcType=LONGVARCHAR},
             </if>
+            <if test="reportTime != null">
+                #{reportTime,jdbcType=TIMESTAMP},
+            </if>
             <if test="extParams != null">
                 #{extParams,jdbcType=LONGVARCHAR},
             </if>
@@ -316,6 +323,9 @@
             <if test="snapshot != null">
                 snapshot = #{snapshot,jdbcType=LONGVARCHAR},
             </if>
+            <if test="reportTime != null">
+                report_time = #{reportTime,jdbcType=TIMESTAMP},
+            </if>
             <if test="extParams != null">
                 ext_params = #{extParams,jdbcType=LONGVARCHAR},
             </if>
@@ -354,7 +364,8 @@
             server_name      = #{serverName,jdbcType=VARCHAR},
             cluster_id       = #{clusterId,jdbcType=INTEGER},
             cluster_name     = #{clusterName,jdbcType=VARCHAR},
-            snapshot        = #{snapshot,jdbcType=LONGVARCHAR},
+            snapshot         = #{snapshot,jdbcType=LONGVARCHAR},
+            report_time      = #{reportTime,jdbcType=TIMESTAMP},
             ext_params       = #{extParams,jdbcType=LONGVARCHAR},
             status           = #{status,jdbcType=INTEGER},
             previous_status  = #{previousStatus,jdbcType=INTEGER},
@@ -372,10 +383,10 @@
             modify_time     = now()
         where id = #{id,jdbcType=INTEGER}
     </update>
-    <update id="updateSnapshot" parameterType="org.apache.inlong.manager.common.pojo.source.SourceSnapshotRequest">
+    <update id="updateSnapshot" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
         update stream_source
-        set snapshot   = #{snapshot,jdbcType=LONGVARCHAR},
-            modify_time = now()
+        set snapshot    = #{snapshot,jdbcType=LONGVARCHAR},
+            report_time = #{reportTime,jdbcType=TIMESTAMP}
         where id = #{id,jdbcType=INTEGER}
     </update>
 
@@ -385,20 +396,21 @@
         where id = #{id,jdbcType=INTEGER}
     </delete>
 
-    <select id="selectAgentTaskDataConfig" parameterType="org.apache.inlong.common.pojo.agent.TaskRequest" resultMap="DataConfig">
+    <select id="selectAgentTaskDataConfig" parameterType="org.apache.inlong.common.pojo.agent.TaskRequest"
+            resultMap="DataConfig">
         select
         detail.inlong_group_id,
         detail.inlong_stream_id,
         detail.modify_time,
         detail.uuid,
         detail.agent_ip,
-        detail.status mod 100                             as op,
+        detail.status mod 100 as op,
         detail.id,
         detail.source_type,
         detail.snapshot,
         detail.ext_params,
         from stream_source detail,
-        where  detail.is_deleted = 0
+        where detail.is_deleted = 0
         <if test="agentIp != null and agentIp != ''">
             and detail.agent_ip = #{agentIp, jdbcType=VARCHAR}
         </if>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
new file mode 100644
index 0000000..57c4e32
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/SourceSnapshotOperation.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Operate the source snapshot
+ */
+@Service
+public class SourceSnapshotOperation implements AutoCloseable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SourceSnapshotOperation.class);
+
+    public final ExecutorService executorService = new ThreadPoolExecutor(
+            1,
+            1,
+            10L,
+            TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(100),
+            new ThreadFactoryBuilder().setNameFormat("stream-source-snapshot-%s").build(),
+            new CallerRunsPolicy());
+
+    @Autowired
+    private StreamSourceEntityMapper sourceMapper;
+
+    // The queue for transfer source snapshot
+    private LinkedBlockingQueue<TaskSnapshotRequest> snapshotQueue = null;
+
+    @Value("${stream.source.snapshot.batch.size:100}")
+    private int batchSize;
+
+    @Value("${stream.source.snapshot.queue.size:10000}")
+    private int queueSize = 10000;
+
+    private volatile boolean isClose = false;
+
+    /**
+     * Start a thread to operate source snapshot after the app started.
+     */
+    @PostConstruct
+    private void startSaveSnapshotTask() {
+        if (snapshotQueue == null) {
+            snapshotQueue = new LinkedBlockingQueue<>(queueSize);
+        }
+        SaveSnapshotTaskRunnable taskRunnable = new SaveSnapshotTaskRunnable();
+        this.executorService.execute(taskRunnable);
+        LOGGER.info("source snapshot operate thread started successfully");
+    }
+
+    /**
+     * Put snapshot into data queue
+     */
+    public Boolean putData(TaskSnapshotRequest request) {
+        if (request == null || CollectionUtils.isEmpty(request.getSnapshotList())) {
+            LOGGER.info("request received, but snapshot list is empty, just return");
+            return true;
+        }
+        try {
+            snapshotQueue.offer(request);
+            return true;
+        } catch (Throwable t) {
+            LOGGER.error("put source snapshot error", t);
+            return false;
+        }
+    }
+
+    @Override
+    public void close() {
+        this.isClose = true;
+    }
+
+    /**
+     * The task of saving source task snapshot into DB.
+     */
+    private class SaveSnapshotTaskRunnable implements Runnable {
+
+        @Override
+        public void run() {
+            while (!isClose) {
+                try {
+                    TaskSnapshotRequest request = snapshotQueue.poll(1, TimeUnit.SECONDS);
+                    if (request == null || CollectionUtils.isEmpty(request.getSnapshotList())) {
+                        continue;
+                    }
+
+                    List<TaskSnapshotMessage> requestList = request.getSnapshotList();
+                    for (TaskSnapshotMessage message : requestList) {
+                        Integer id = message.getJobId();
+                        StreamSourceEntity entity = new StreamSourceEntity();
+                        entity.setId(id);
+                        entity.setSnapshot(message.getSnapshot());
+                        entity.setReportTime(request.getReportTime());
+
+                        // update snapshot
+                        sourceMapper.updateSnapshot(entity);
+                    }
+                } catch (Throwable t) {
+                    LOGGER.error("source snapshot task runnable error", t);
+                }
+            }
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 380d4b5..01bf199 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.source;
 
 import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.common.pojo.source.SourceSnapshotRequest;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -148,6 +148,6 @@ public interface StreamSourceService {
      * @param request Heartbeat request.
      * @return Whether succeed.
      */
-    Boolean reportSnapshot(SourceSnapshotRequest request);
+    Boolean reportSnapshot(TaskSnapshotRequest request);
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 679fc61..cea9fd8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -22,17 +22,16 @@ import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SourceType;
-import org.apache.inlong.manager.common.pojo.source.SourceSnapshotRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.service.CommonOperateService;
@@ -61,6 +60,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     private StreamSourceEntityMapper sourceMapper;
     @Autowired
     private CommonOperateService commonOperateService;
+    @Autowired
+    private SourceSnapshotOperation heartbeatOperation;
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
@@ -72,7 +73,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
         // Check if it can be added
         String groupId = request.getInlongGroupId();
-        InlongGroupEntity inlongGroupEntity = commonOperateService.checkGroupStatus(groupId, operator);
+        commonOperateService.checkGroupStatus(groupId, operator);
 
         // Make sure that there is no source info with the current groupId and streamId
         String streamId = request.getInlongStreamId();
@@ -253,11 +254,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     }
 
     @Override
-    public Boolean reportSnapshot(SourceSnapshotRequest request) {
-        if (request == null || request.getId() == null || request.getSnapshot() == null) {
-            return true;
-        }
-        return sourceMapper.updateSnapshot(request) > 0;
+    public Boolean reportSnapshot(TaskSnapshotRequest request) {
+        return heartbeatOperation.putData(request);
     }
 
     private void checkParams(SourceRequest request) {
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
index f91fcf7..e198b51 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
@@ -25,14 +25,12 @@ import org.apache.inlong.manager.service.core.AuditService;
 import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.TestComponent;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-@TestComponent
 public class AuditServiceTest extends ServiceBaseTest {
 
     @Autowired
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
index 4ed7bbc..aaedbb7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
@@ -24,7 +24,6 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
-import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.InlongGroupService;
 import org.junit.Assert;
 import org.junit.Test;
@@ -38,7 +37,7 @@ import java.util.List;
  * Inlong group service test
  */
 @TestComponent
-public class InlongGroupServiceTest extends ServiceBaseTest {
+public class InlongGroupServiceTest {
 
     private final String globalGroupId = "b_group1";
     private final String globalGroupName = "group1";
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
index 300c1fd..5eb136c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceTest.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.service.core.impl;
 
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,7 +28,7 @@ import org.springframework.boot.test.context.TestComponent;
  * Inlong stream service test
  */
 @TestComponent
-public class InlongStreamServiceTest extends ServiceBaseTest {
+public class InlongStreamServiceTest {
 
     private final String globalGroupId = "b_group1";
     private final String globalGroupName = "group1";
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/BinlogStreamSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
similarity index 71%
rename from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/BinlogStreamSourceServiceTest.java
rename to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
index a103d9f..e4cb65b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/BinlogStreamSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/StreamSourceServiceTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.core.source;
 
+import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
@@ -29,10 +31,13 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.util.Collections;
+import java.util.Date;
+
 /**
  * Stream source service test
  */
-public class BinlogStreamSourceServiceTest extends ServiceBaseTest {
+public class StreamSourceServiceTest extends ServiceBaseTest {
 
     private final String globalGroupId = "b_group1";
     private final String globalStreamId = "stream1";
@@ -49,7 +54,7 @@ public class BinlogStreamSourceServiceTest extends ServiceBaseTest {
         BinlogSourceRequest sourceInfo = new BinlogSourceRequest();
         sourceInfo.setInlongGroupId(globalGroupId);
         sourceInfo.setInlongStreamId(globalStreamId);
-        sourceInfo.setSourceType(Constant.SINK_HIVE);
+        sourceInfo.setSourceType(Constant.SOURCE_DB_BINLOG);
 
         return sourceService.save(sourceInfo, globalOperator);
     }
@@ -59,7 +64,7 @@ public class BinlogStreamSourceServiceTest extends ServiceBaseTest {
         Integer id = this.saveSource();
         Assert.assertNotNull(id);
 
-        boolean result = sourceService.delete(id, Constant.SINK_HIVE, globalOperator);
+        boolean result = sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
         Assert.assertTrue(result);
     }
 
@@ -67,16 +72,16 @@ public class BinlogStreamSourceServiceTest extends ServiceBaseTest {
     public void testListByIdentifier() {
         Integer id = this.saveSource();
 
-        SourceResponse source = sourceService.get(id, Constant.SINK_HIVE);
+        SourceResponse source = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
         Assert.assertEquals(globalGroupId, source.getInlongGroupId());
 
-        sourceService.delete(id, Constant.SINK_HIVE, globalOperator);
+        sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
     }
 
     @Test
     public void testGetAndUpdate() {
         Integer id = this.saveSource();
-        SourceResponse response = sourceService.get(id, Constant.SINK_HIVE);
+        SourceResponse response = sourceService.get(id, Constant.SOURCE_DB_BINLOG);
         Assert.assertEquals(globalGroupId, response.getInlongGroupId());
 
         BinlogSourceResponse binlogResponse = (BinlogSourceResponse) response;
@@ -84,6 +89,27 @@ public class BinlogStreamSourceServiceTest extends ServiceBaseTest {
         BinlogSourceRequest request = CommonBeanUtils.copyProperties(binlogResponse, BinlogSourceRequest::new);
         boolean result = sourceService.update(request, globalOperator);
         Assert.assertTrue(result);
+
+        sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
+    }
+
+    @Test
+    public void testReportSnapshot() {
+        Integer id = this.saveSource();
+
+        TaskSnapshotRequest request = new TaskSnapshotRequest();
+        request.setAgentIp("127.0.0.1");
+        request.setReportTime(new Date());
+
+        TaskSnapshotMessage message = new TaskSnapshotMessage();
+        message.setJobId(id);
+        message.setSnapshot("{\"offset\": 100}");
+        request.setSnapshotList(Collections.singletonList(message));
+
+        Boolean result = sourceService.reportSnapshot(request);
+        Assert.assertTrue(result);
+
+        sourceService.delete(id, Constant.SOURCE_DB_BINLOG, globalOperator);
     }
 
 }
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index fccb595..75d8725 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -566,7 +566,8 @@ CREATE TABLE `stream_source`
     `server_name`      varchar(50)       DEFAULT '' COMMENT 'Name of the source server',
     `cluster_id`       int(11)           DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
     `cluster_name`     varchar(50)       DEFAULT '' COMMENT 'Name of the cluster that collected this source',
-    `heartbeat`        text              DEFAULT NULL COMMENT 'Heartbeat of this source task',
+    `snapshot`         text              DEFAULT NULL COMMENT 'Snapshot of this source task',
+    `report_time`      timestamp         DEFAULT NULL COMMENT 'Snapshot time',
     `ext_params`       text              DEFAULT NULL COMMENT 'Another fields will saved as JSON string, such as filePath, dbName, tableName, etc',
     `status`           int(4)            DEFAULT '0' COMMENT 'Data source status',
     `previous_status`  int(4)            DEFAULT '0' COMMENT 'Previous status',
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index e921d7a..dbd781b 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -598,7 +598,8 @@ CREATE TABLE `stream_source`
     `server_name`      varchar(50)       DEFAULT '' COMMENT 'Name of the source server',
     `cluster_id`       int(11)           DEFAULT NULL COMMENT 'Id of the cluster that collected this source',
     `cluster_name`     varchar(50)       DEFAULT '' COMMENT 'Name of the cluster that collected this source',
-    `snapshot`        text              DEFAULT NULL COMMENT 'snapshot of this source task',
+    `snapshot`         text              DEFAULT NULL COMMENT 'Snapshot of this source task',
+    `report_time`      timestamp         DEFAULT NULL COMMENT 'Snapshot time',
     `ext_params`       text              DEFAULT NULL COMMENT 'Another fields will saved as JSON string, such as filePath, dbName, tableName, etc',
     `status`           int(4)            DEFAULT '0' COMMENT 'Data source status',
     `previous_status`  int(4)            DEFAULT '0' COMMENT 'Previous status',
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
index 787005e..5fe4bfe 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/AgentController.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.web.controller.openapi;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
 import org.apache.inlong.common.pojo.agent.TaskResult;
 import org.apache.inlong.manager.common.beans.Response;
@@ -35,6 +36,7 @@ import org.apache.inlong.manager.service.core.AgentHeartbeatService;
 import org.apache.inlong.manager.service.core.AgentSysConfigService;
 import org.apache.inlong.manager.service.core.AgentTaskService;
 import org.apache.inlong.manager.service.core.ThirdPartyClusterService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -45,11 +47,13 @@ import org.springframework.web.bind.annotation.RestController;
 import java.util.List;
 
 @RestController
-@RequestMapping("/openapi")
+@RequestMapping("/openapi/agent")
 @Api(tags = "Agent Config")
 public class AgentController {
 
     @Autowired
+    private StreamSourceService sourceService;
+    @Autowired
     private AgentTaskService agentTaskService;
     @Autowired
     private AgentSysConfigService agentSysConfigService;
@@ -59,50 +63,56 @@ public class AgentController {
     private ThirdPartyClusterService thirdPartyClusterService;
 
     @GetMapping("/getInLongManagerIp")
-    @ApiOperation(value = "get inlong manager ip list")
+    @ApiOperation(value = "Get inlong manager ip list")
     public Response<List<String>> getInLongManagerIp() {
         return Response.success(thirdPartyClusterService.listClusterIpByType("inlong-openapi"));
     }
 
     @PostMapping("/getTask")
-    @ApiOperation(value = "general fetch task")
+    @ApiOperation(value = "Get source task config")
     public Response<TaskResult> getTask(@RequestBody TaskRequest taskRequest) {
         return Response.success(agentTaskService.getAgentTask(taskRequest));
     }
 
+    @PostMapping("/reportSnapshot")
+    @ApiOperation(value = "Report source task snapshot")
+    public Response<Boolean> reportSnapshot(TaskSnapshotRequest request) {
+        return Response.success(sourceService.reportSnapshot(request));
+    }
+
     @Deprecated
     @PostMapping("/fileAgent/getTaskConf")
-    @ApiOperation(value = "fetch file access task")
+    @ApiOperation(value = "Get file task")
     public Response<FileAgentTaskInfo> getFileAgentTask(@RequestBody FileAgentCommandInfo info) {
         return Response.success(agentTaskService.getFileAgentTask(info));
     }
 
     @PostMapping("/fileAgent/confirmAgentIp")
-    @ApiOperation(value = "confirm current agent ip")
+    @ApiOperation(value = "Confirm current agent ip")
     public Response<String> confirmAgentIp(@RequestBody ConfirmAgentIpRequest info) {
         return Response.success(agentTaskService.confirmAgentIp(info));
     }
 
     @PostMapping("/fileAgent/getAgentSysConf")
-    @ApiOperation(value = "get agent system config")
+    @ApiOperation(value = "Get agent system config")
     public Response<AgentSysConfig> getAgentSysConf(@RequestBody AgentSysconfRequest info) {
         return Response.success(agentSysConfigService.getAgentSysConfig(info));
     }
 
     @PostMapping("/fileAgent/heartbeat")
-    @ApiOperation(value = "agent heartbeat report")
+    @ApiOperation(value = "Report agent heartbeat")
     public Response<String> heartbeat(@RequestBody AgentHeartbeatRequest info) {
         return Response.success(heartbeatService.heartbeat(info));
     }
 
     @PostMapping("/fileAgent/checkAgentTaskConf")
-    @ApiOperation(value = "agent data source comparison")
+    @ApiOperation(value = "Check agent source config")
     public Response<List<FileAgentTaskConfig>> checkAgentTaskConf(@RequestBody CheckAgentTaskConfRequest info) {
         return Response.success(agentTaskService.checkAgentTaskConf(info));
     }
 
     @PostMapping("/fileAgent/reportAgentStatus")
-    @ApiOperation(value = "agent status report")
+    @ApiOperation(value = "Report agent status")
     public Response<String> reportAgentStatus(@RequestBody AgentStatusReportRequest info) {
         return Response.success(agentTaskService.reportAgentStatus(info));
     }