You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/26 03:09:18 UTC

[incubator-inlong] branch master updated: [INLONG-3856][Manager] Add beat heart common report handler (#3871)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 38a320807 [INLONG-3856][Manager] Add beat heart common report handler (#3871)
38a320807 is described below

commit 38a3208078d3d6926d7a682c77d0ba04feafa18d
Author: baomingyu <ba...@163.com>
AuthorDate: Tue Apr 26 11:09:13 2022 +0800

    [INLONG-3856][Manager] Add beat heart common report handler (#3871)
---
 .../inlong/manager/common/enums/ErrorCodeEnum.java |   2 +
 .../common/pojo/heartbeat/ComponentHeartbeat.java} |  27 +-
 .../heartbeat/ComponentHeartbeatPageRequest.java}  |  35 +-
 .../pojo/heartbeat/ComponentHeartbeatRequest.java} |  34 +-
 .../heartbeat/ComponentHeartbeatResponse.java}     |  41 +-
 .../common/pojo/heartbeat/GroupHeartbeat.java}     |  27 +-
 .../pojo/heartbeat/GroupHeartbeatPageRequest.java} |  35 +-
 .../pojo/heartbeat/GroupHeartbeatRequest.java}     |  35 +-
 .../pojo/heartbeat/GroupHeartbeatResponse.java}    |  44 +-
 .../pojo/heartbeat/HeartbeatReportRequest.java}    |  43 +-
 .../common/pojo/heartbeat/StreamHeartbeat.java}    |  29 +-
 .../heartbeat/StreamHeartbeatPageRequest.java}     |  37 +-
 .../pojo/heartbeat/StreamHeartbeatRequest.java}    |  38 +-
 .../pojo/heartbeat/StreamHeartbeatResponse.java}   |  47 +-
 .../dao/entity/ComponentHeartbeatEntity.java}      |  46 +-
 .../entity/ComponentHeartbeatEntityWithBLOBs.java} |  29 +-
 .../manager/dao/entity/GroupHeartbeatEntity.java}  |  48 +-
 .../dao/entity/GroupHeartbeatEntityWithBLOBs.java} |  28 +-
 .../manager/dao/entity/StreamHeartbeatEntity.java} |  50 ++-
 .../entity/StreamHeartbeatEntityWithBLOBs.java}    |  29 +-
 .../mapper/ComponentHeartbeatEntityMapper.java}    |  44 +-
 .../dao/mapper/GroupHeartbeatEntityMapper.java}    |  46 +-
 .../dao/mapper/StreamHeartbeatEntityMapper.java    |  41 ++
 .../src/main/resources/generatorConfig.xml         |  18 +
 .../mappers/ComponentHeartbeatEntityMapper.xml     |  83 ++++
 .../mappers/GroupHeartbeatEntityMapper.xml         |  86 ++++
 .../mappers/StreamHeartbeatEntityMapper.xml        |  87 ++++
 .../manager/service/core/HeartbeatService.java     | 101 +++++
 .../service/core/impl/HeartbeatServiceImpl.java    | 500 +++++++++++++++++++++
 inlong-manager/manager-test/pom.xml                |   9 +-
 .../org/apache/inlong/manager/test/BaseTest.java   |   3 +
 .../main/resources/sql/apache_inlong_manager.sql   |  85 ++++
 .../manager-web/sql/apache_inlong_manager.sql      |  62 +++
 .../web/controller/HeartbeatInfoController.java    | 133 ++++++
 .../controller/openapi/HeartbeatController.java    |  47 ++
 .../openapi/HeartbeatControllerTest.java           | 210 +++++++++
 inlong-manager/pom.xml                             |   7 +
 37 files changed, 1844 insertions(+), 422 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index f50e56b8f..3ec8491a2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -27,6 +27,8 @@ public enum ErrorCodeEnum {
     GROUP_ID_IS_EMPTY(102, "Inlong group id is empty"),
     STREAM_ID_IS_EMPTY(103, "Inlong stream id is empty"),
     REQUEST_IS_EMPTY(104, "Request is empty"),
+    REQUEST_COMPONENT_EMPTY(105, "Component is empty"),
+    REQUEST_INSTANCE_EMPTY(106, "Instance is empty"),
     USER_IS_NOT_MANAGER(110, "%s is not the manager, please contact %s"),
 
     GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation authority"),
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
index a588fabd9..61cc41911 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
@@ -15,27 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import lombok.Getter;
+import lombok.Setter;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+public class ComponentHeartbeat {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    private String statusHeartbeat;
 
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    private String metricHeartbeat;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java
index a588fabd9..06d95756f 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java
@@ -15,27 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.inlong.manager.common.beans.PageRequest;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong component heartbeats query request")
+public class ComponentHeartbeatPageRequest
+        extends PageRequest {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    @ApiModelProperty
+    private String component;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java
index a588fabd9..bb2dd27c4 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java
@@ -15,27 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong component heartbeat query request")
+public class ComponentHeartbeatRequest {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    @ApiModelProperty
+    private String component;
 
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    @ApiModelProperty
+    private String instance;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
index a588fabd9..68e944a7e 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
@@ -15,27 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong component heartbeat query response")
+public class ComponentHeartbeatResponse {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    @ApiModelProperty
+    private String component;
 
-    @Autowired
-    DataSource dataSource;
+    @ApiModelProperty
+    private String instance;
 
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    @ApiModelProperty
+    private long reportTime;
+
+    @ApiModelProperty
+    private String statusHeartbeat;
+
+    @ApiModelProperty
+    private String metricHeartbeat;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
index a588fabd9..f2e81f9ed 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
@@ -15,27 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import lombok.Getter;
+import lombok.Setter;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+public class GroupHeartbeat {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    private String inlongGroupId;
 
-    @Autowired
-    DataSource dataSource;
+    private String statusHeartbeat;
 
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    private String metricHeartbeat;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java
index a588fabd9..23e72f3d0 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java
@@ -15,27 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.inlong.manager.common.beans.PageRequest;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+@Getter
+@Setter
+@ApiModel("Inlong group heartbeats request")
+public class GroupHeartbeatPageRequest extends PageRequest {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    @ApiModelProperty
+    private String component;
 
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    @ApiModelProperty
+    private String instance;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java
index a588fabd9..79ec29a78 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java
@@ -15,27 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+@Getter
+@Setter
+@ApiModel("Inlong group heartbeat request")
+public class GroupHeartbeatRequest {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    @ApiModelProperty
+    private String component;
 
-    @Autowired
-    DataSource dataSource;
+    @ApiModelProperty
+    private String instance;
 
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    @ApiModelProperty
+    private String inlongGroupId;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
index a588fabd9..ed3ad688e 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
@@ -15,27 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+@Getter
+@Setter
+@ApiModel("Inlong group heartbeat response")
+public class GroupHeartbeatResponse {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    @ApiModelProperty
+    private String component;
 
-    @Autowired
-    DataSource dataSource;
+    @ApiModelProperty
+    private String instance;
 
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    @ApiModelProperty
+    private long reportTime;
+
+    @ApiModelProperty
+    private String inlongGroupId;
+
+    @ApiModelProperty
+    private String statusHeartbeat;
+
+    @ApiModelProperty
+    private String metricHeartbeat;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
index a588fabd9..7f9bed089 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
@@ -15,27 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import java.util.List;
+import lombok.Getter;
+import lombok.Setter;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+public class HeartbeatReportRequest {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    private String component;
 
-    @Autowired
-    DataSource dataSource;
+    private String instance;
 
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    private long reportTimestamp;
+
+    /*
+     * component
+     */
+    private ComponentHeartbeat componentHeartbeat;
+
+    /*
+     * group
+     */
+    private List<GroupHeartbeat> groupHeartbeats;
+
+    /*
+     * stream
+     */
+    private List<StreamHeartbeat> streamHeartbeats;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
index a588fabd9..391c62221 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
@@ -15,27 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import lombok.Getter;
+import lombok.Setter;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+public class StreamHeartbeat {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    private String inlongGroupId;
 
-    @Autowired
-    DataSource dataSource;
+    private String inlongStreamId;
 
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    private String statusHeartbeat;
+
+    private String metricHeartbeat;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java
index a588fabd9..08ead5382 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java
@@ -15,27 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.inlong.manager.common.beans.PageRequest;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong stream heartbeats query request")
+public class StreamHeartbeatPageRequest
+        extends PageRequest {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    @ApiModelProperty
+    private String component;
 
-    @Autowired
-    DataSource dataSource;
+    @ApiModelProperty
+    private String instance;
 
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    @ApiModelProperty
+    private String inlongGroupId;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java
index a588fabd9..c5420a0b1 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java
@@ -15,27 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong stream heartbeat query request")
+public class StreamHeartbeatRequest {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    @ApiModelProperty
+    private String component;
 
-    @Autowired
-    DataSource dataSource;
+    @ApiModelProperty
+    private String instance;
 
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    @ApiModelProperty
+    private String inlongGroupId;
+
+    @ApiModelProperty
+    private String inlongStreamId;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
index a588fabd9..4b808f090 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
@@ -15,27 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong stream heartbeat response")
+public class StreamHeartbeatResponse {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    @ApiModelProperty
+    private String component;
 
-    @Autowired
-    DataSource dataSource;
+    @ApiModelProperty
+    private String instance;
 
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
+    @ApiModelProperty
+    private long reportTime;
+
+    @ApiModelProperty
+    private String inlongGroupId;
+
+    @ApiModelProperty
+    private String inlongStreamId;
+
+    @ApiModelProperty
+    private String statusHeartbeat;
+
+    @ApiModelProperty
+    private String metricHeartbeat;
 }
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
index a588fabd9..b2d8ccca5 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
@@ -15,27 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
-
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
-
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
-
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
-}
+package org.apache.inlong.manager.dao.entity;
+
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Data;
+
+@Data
+public class ComponentHeartbeatEntity implements Serializable {
+    private Integer id;
+
+    private String component;
+
+    private String instance;
+
+    private Date reportTime;
+
+    private Date modifyTime;
+
+    private Date createTime;
+
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java
index a588fabd9..a9eaa8691 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java
@@ -15,27 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.dao.entity;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import java.io.Serializable;
+import lombok.Data;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+public class ComponentHeartbeatEntityWithBLOBs extends ComponentHeartbeatEntity implements Serializable {
+    private String statusHeartbeat;
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    private String metricHeartbeat;
 
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
-}
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
index a588fabd9..230202497 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
@@ -15,27 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
-
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
-
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
-
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
-}
+package org.apache.inlong.manager.dao.entity;
+
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Data;
+
+@Data
+public class GroupHeartbeatEntity implements Serializable {
+    private Integer id;
+
+    private String component;
+
+    private String instance;
+
+    private String inlongGroupId;
+
+    private Date reportTime;
+
+    private Date modifyTime;
+
+    private Date createTime;
+
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java
index a588fabd9..f747cde3f 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java
@@ -15,27 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.dao.entity;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import java.io.Serializable;
+import lombok.Data;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+public class GroupHeartbeatEntityWithBLOBs extends GroupHeartbeatEntity implements Serializable {
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    private String statusHeartbeat;
 
-    @Autowired
-    DataSource dataSource;
+    private String metricHeartbeat;
 
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
-}
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
index a588fabd9..8ee2255c9 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
@@ -15,27 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
-
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
-
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
-
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
-}
+package org.apache.inlong.manager.dao.entity;
+
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Data;
+
+@Data
+public class StreamHeartbeatEntity implements Serializable {
+    private Integer id;
+
+    private String component;
+
+    private String instance;
+
+    private String inlongGroupId;
+
+    private String inlongStreamId;
+
+    private Date reportTime;
+
+    private Date modifyTime;
+
+    private Date createTime;
+
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java
index a588fabd9..83ceb1633 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java
@@ -15,27 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.dao.entity;
 
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import java.io.Serializable;
+import lombok.Data;
 
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+public class StreamHeartbeatEntityWithBLOBs extends StreamHeartbeatEntity implements Serializable {
+    private String statusHeartbeat;
 
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+    private String metricHeartbeat;
 
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
-}
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
index a588fabd9..2e653dc7e 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
@@ -15,27 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
-
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
-
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
-
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
-}
+package org.apache.inlong.manager.dao.mapper;
+
+import java.util.List;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
+
+public interface ComponentHeartbeatEntityMapper {
+    int deleteByPrimaryKey(Integer id);
+
+    int insert(ComponentHeartbeatEntityWithBLOBs record);
+
+    ComponentHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+
+    ComponentHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
+            @Param("instance") String instance);
+
+    List<ComponentHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component);
+
+    int updateByKeyWithBLOBs(ComponentHeartbeatEntityWithBLOBs record);
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
similarity index 50%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
index a588fabd9..e37062e4e 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
@@ -15,27 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.test;
-
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
-
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
-
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
-    @Autowired
-    DataSource dataSource;
-
-    @PostConstruct
-    public void initH2Function() {
-        H2FunctionsLoader.loadMysqlFunctions(dataSource);
-    }
-}
+package org.apache.inlong.manager.dao.mapper;
+
+import java.util.List;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
+
+public interface GroupHeartbeatEntityMapper {
+    int deleteByPrimaryKey(Integer id);
+
+    int insert(GroupHeartbeatEntityWithBLOBs record);
+
+    GroupHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+
+    GroupHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
+            @Param("instance") String instance,
+            @Param("inlongGroupId") String inlongGroupId);
+
+    List<GroupHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component,
+            @Param("instance") String instance);
+
+    int updateByKeyWithBLOBs(GroupHeartbeatEntityWithBLOBs record);
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java
new file mode 100644
index 000000000..2ab8293e5
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import java.util.List;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
+
+public interface StreamHeartbeatEntityMapper {
+    int deleteByPrimaryKey(Integer id);
+
+    int insert(StreamHeartbeatEntityWithBLOBs record);
+
+    StreamHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+
+    StreamHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
+            @Param("instance") String instance,
+            @Param("inlongGroupId") String inlongGroupId,
+            @Param("inlongStreamId") String inlongStreamId);
+
+    List<StreamHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component,
+            @Param("instance") String instance,
+            @Param("inlongGroupId") String inlongGroupId);
+
+    int updateByKeyWithBLOBs(StreamHeartbeatEntityWithBLOBs record);
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index 33073bbf0..e5876c57a 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -239,6 +239,24 @@
                enableDeleteByPrimaryKey="true" enableInsert="true"
                enableCountByExample="false" enableDeleteByExample="false"
                enableSelectByExample="false" enableUpdateByExample="false"/>-->
+        <table tableName="component_heartbeat" domainObjectName="ComponentHeartbeatEntity"
+            enableSelectByPrimaryKey="true"
+            enableUpdateByPrimaryKey="true"
+            enableDeleteByPrimaryKey="true" enableInsert="true"
+            enableCountByExample="false" enableDeleteByExample="false"
+            enableSelectByExample="false" enableUpdateByExample="false"/>
+        <table tableName="group_heartbeat" domainObjectName="GroupHeartbeatEntity"
+            enableSelectByPrimaryKey="true"
+            enableUpdateByPrimaryKey="true"
+            enableDeleteByPrimaryKey="true" enableInsert="true"
+            enableCountByExample="false" enableDeleteByExample="false"
+            enableSelectByExample="false" enableUpdateByExample="false"/>
+        <table tableName="stream_heartbeat" domainObjectName="StreamHeartbeatEntity"
+            enableSelectByPrimaryKey="true"
+            enableUpdateByPrimaryKey="true"
+            enableDeleteByPrimaryKey="true" enableInsert="true"
+            enableCountByExample="false" enableDeleteByExample="false"
+            enableSelectByExample="false" enableUpdateByExample="false"/>
 
     </context>
 </generatorConfiguration>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
new file mode 100644
index 000000000..508210e3d
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper">
+  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
+    <id column="id" jdbcType="INTEGER" property="id" />
+    <result column="component" jdbcType="VARCHAR" property="component" />
+    <result column="instance" jdbcType="VARCHAR" property="instance" />
+    <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
+    <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
+    <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
+  </resultMap>
+  <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
+    <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
+    <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
+  </resultMap>
+  <sql id="Base_Column_List">
+    id, component, instance, report_time, modify_time, create_time
+  </sql>
+  <sql id="Blob_Column_List">
+    status_heartbeat, metric_heartbeat
+  </sql>
+  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
+    select
+    <include refid="Base_Column_List" />
+    ,
+    <include refid="Blob_Column_List" />
+    from component_heartbeat
+    where id = #{id,jdbcType=INTEGER}
+  </select>
+  <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+    select
+    <include refid="Base_Column_List" />
+    ,
+    <include refid="Blob_Column_List" />
+    from component_heartbeat
+    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
+  </select>
+  <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+    select
+    <include refid="Base_Column_List" />
+    ,
+    <include refid="Blob_Column_List" />
+    from component_heartbeat
+    where component = #{component,jdbcType=VARCHAR}
+  </select>
+  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+    delete from component_heartbeat
+    where id = #{id,jdbcType=INTEGER}
+  </delete>
+  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
+    insert into component_heartbeat (component, instance,
+      report_time, status_heartbeat, metric_heartbeat)
+    values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+      #{reportTime,jdbcType=TIMESTAMP}, #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR})
+  </insert>
+  <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
+    update component_heartbeat
+    set
+      report_time = #{reportTime,jdbcType=TIMESTAMP},
+      status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
+      metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
+    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
+  </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml
new file mode 100644
index 000000000..bbba5e324
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper">
+  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity">
+    <id column="id" jdbcType="INTEGER" property="id" />
+    <result column="component" jdbcType="VARCHAR" property="component" />
+    <result column="instance" jdbcType="VARCHAR" property="instance" />
+    <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
+    <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
+    <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
+    <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
+  </resultMap>
+  <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
+    <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
+    <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
+  </resultMap>
+  <sql id="Base_Column_List">
+    id, component, instance, inlong_group_id, report_time, modify_time, create_time
+  </sql>
+  <sql id="Blob_Column_List">
+    status_heartbeat, metric_heartbeat
+  </sql>
+  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
+    select 
+    <include refid="Base_Column_List" />
+    ,
+    <include refid="Blob_Column_List" />
+    from group_heartbeat
+    where id = #{id,jdbcType=INTEGER}
+  </select>
+  <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+    select
+    <include refid="Base_Column_List" />
+    ,
+    <include refid="Blob_Column_List" />
+    from group_heartbeat
+    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+  </select>
+  <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+    select
+    <include refid="Base_Column_List" />
+    ,
+    <include refid="Blob_Column_List" />
+    from group_heartbeat
+    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
+  </select>
+  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+    delete from group_heartbeat
+    where id = #{id,jdbcType=INTEGER}
+  </delete>
+  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
+    insert into group_heartbeat (component, instance,
+      inlong_group_id, report_time, status_heartbeat, metric_heartbeat
+      )
+    values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+      #{inlongGroupId,jdbcType=VARCHAR}, #{reportTime,jdbcType=TIMESTAMP}, #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR}
+      )
+  </insert>
+  <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
+    update group_heartbeat
+    set
+      report_time = #{reportTime,jdbcType=TIMESTAMP},
+      status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
+      metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
+    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+  </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml
new file mode 100644
index 000000000..c3762875e
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper">
+  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
+    <id column="id" jdbcType="INTEGER" property="id" />
+    <result column="component" jdbcType="VARCHAR" property="component" />
+    <result column="instance" jdbcType="VARCHAR" property="instance" />
+    <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
+    <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
+    <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
+    <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
+    <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
+  </resultMap>
+  <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
+    <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
+    <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
+  </resultMap>
+  <sql id="Base_Column_List">
+    id, component, instance, inlong_group_id, inlong_stream_id, report_time, modify_time, 
+    create_time
+  </sql>
+  <sql id="Blob_Column_List">
+    status_heartbeat, metric_heartbeat
+  </sql>
+  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
+    select 
+    <include refid="Base_Column_List" />
+    ,
+    <include refid="Blob_Column_List" />
+    from stream_heartbeat
+    where id = #{id,jdbcType=INTEGER}
+  </select>
+  <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+    select
+    <include refid="Base_Column_List" />
+    ,
+    <include refid="Blob_Column_List" />
+    from stream_heartbeat
+    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR} and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
+  </select>
+  <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+    select
+    <include refid="Base_Column_List" />
+    ,
+    <include refid="Blob_Column_List" />
+    from stream_heartbeat
+    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+  </select>
+  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+    delete from stream_heartbeat
+    where id = #{id,jdbcType=INTEGER}
+  </delete>
+  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
+    insert into stream_heartbeat (component, instance,
+      inlong_group_id, inlong_stream_id, report_time, status_heartbeat, metric_heartbeat)
+    values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+      #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, #{reportTime,jdbcType=TIMESTAMP}, 
+      #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR})
+  </insert>
+  <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
+    update stream_heartbeat
+    set
+      report_time = #{reportTime,jdbcType=TIMESTAMP},
+      status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
+      metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
+    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR} and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
+  </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java
new file mode 100644
index 000000000..ea8acf4aa
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+
+/**
+ * Heartbeat Service
+ */
+public interface HeartbeatService {
+
+    /**
+     * report Heartbeat
+     * @param request request
+     * @return
+     */
+    String reportHeartbeatInfo(HeartbeatReportRequest request);
+
+    /**
+     * get Component HeartbeatInfo
+     * @param component component
+     * @param instance instance
+     * @return ComponentHeartBeatResponse
+     */
+    ComponentHeartbeatResponse getComponentHeartbeatInfo(String component,
+            String instance);
+
+    /**
+     * get Group HeartbeatInfo
+     * @param component component
+     * @param instance instance
+     * @param inlongGroupId inlongGroupId
+     * @return  GroupHeartbeatResponse
+     */
+    GroupHeartbeatResponse getGroupHeartbeatInfo(String component,
+            String instance, String inlongGroupId);
+
+    /**
+     * get Stream HeartbeatInfo
+     * @param component component
+     * @param instance instance
+     * @param inlongGroupId inlongGroupId
+     * @param inlongStreamId inlongStreamId
+     * @return StreamHeartBeatResponse
+     */
+    StreamHeartbeatResponse getStreamHeartbeatInfo(String component,
+            String instance, String inlongGroupId, String inlongStreamId);
+
+    /**
+     * get Component HeartbeatInfos by page
+     * @param component  component
+     * @param pageNum pageNum
+     * @param pageSize pageSize
+     * @return one page of ComponentHeartBeatResponse
+     */
+    PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfos(String component, int pageNum,
+            int pageSize);
+
+    /**
+     * get Group HeartbeatInfos by page
+     * @param component component
+     * @param instance instance
+     * @param pageNum pageNum
+     * @param pageSize pageSize
+     * @return one page of GroupHeartbeatResponse
+     */
+    PageInfo<GroupHeartbeatResponse> getGroupHeartbeatInfos(String component,
+            String instance, int pageNum, int pageSize);
+
+    /**
+     * get Stream HeartbeatInfos
+     * @param component component
+     * @param instance instance
+     * @param inlongGroupId inlongGroupId
+     * @param pageNum pageNum
+     * @param pageSize pageSize
+     * @return one page of GroupHeartbeatResponse
+     */
+    PageInfo<StreamHeartbeatResponse> getStreamHeartbeatInfos(String component,
+            String instance, String inlongGroupId, int pageNum, int pageSize);
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
new file mode 100644
index 000000000..3b46c771f
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.impl;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import com.google.gson.Gson;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.ComponentTypeEnum;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ *  report or query heartbeat info
+ */
+@Service
+public class HeartbeatServiceImpl
+        implements HeartbeatService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatServiceImpl.class);
+
+    @Autowired
+    private ComponentHeartbeatEntityMapper componentHeartbeatEntityMapper;
+
+    @Autowired
+    private GroupHeartbeatEntityMapper groupHeartbeatEntityMapper;
+
+    @Autowired
+    private StreamHeartbeatEntityMapper streamHeartbeatEntityMapper;
+
+    private static Gson gson = new Gson();
+
+    /**
+     * heartbeat common handler
+     * @param request request
+     * @return
+     */
+    @Override
+    public String reportHeartbeatInfo(HeartbeatReportRequest request) {
+        if (request != null && StringUtils.isNotEmpty(request.getComponent())) {
+            ComponentTypeEnum componentType =
+                    ComponentTypeEnum.valueOf(request.getComponent());
+            switch (componentType) {
+                case Sort:
+                case DataProxy:
+                case Agent:
+                case Cache:
+                default:
+                    updateByDefaultWay(request);
+            }
+        } else {
+            LOGGER.warn("request is null or component [{}] is not supported",
+                    request.getComponent());
+        }
+        return "Success";
+    }
+
+    /**
+     * update Heartbeat Data
+     * @param request  request
+     */
+    private void updateComponentHeartbeatData(HeartbeatReportRequest request) {
+        if (request == null || request.getComponentHeartbeat() == null) {
+            return;
+        }
+        ComponentHeartbeatEntityWithBLOBs entity = new ComponentHeartbeatEntityWithBLOBs();
+        entity.setComponent(request.getComponent());
+        entity.setInstance(request.getInstance());
+        entity.setReportTime(new Date(request.getReportTimestamp()));
+        if (StringUtils.isNotEmpty(request.getComponentHeartbeat().getStatusHeartbeat())) {
+            entity.setStatusHeartbeat(request.getComponentHeartbeat().getStatusHeartbeat());
+        }
+        if (StringUtils.isNotEmpty(request.getComponentHeartbeat().getMetricHeartbeat())) {
+            entity.setMetricHeartbeat(request.getComponentHeartbeat().getMetricHeartbeat());
+        }
+        int count = componentHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
+        if (count == 0) {
+            componentHeartbeatEntityMapper.insert(entity);
+        }
+    }
+
+    /**
+     * update Component StaticData
+     * @param request request
+     */
+    private void updateGroupHeartbeatData(HeartbeatReportRequest request) {
+        List<GroupHeartbeat> list = request.getGroupHeartbeats();
+        if (list != null) {
+            for (GroupHeartbeat info : list) {
+                GroupHeartbeatEntityWithBLOBs entity = new GroupHeartbeatEntityWithBLOBs();
+                entity.setComponent(request.getComponent());
+                entity.setInstance(request.getInstance());
+                entity.setReportTime(new Date(request.getReportTimestamp()));
+                entity.setInlongGroupId(info.getInlongGroupId());
+                entity.setMetricHeartbeat(info.getMetricHeartbeat());
+                entity.setStatusHeartbeat(info.getStatusHeartbeat());
+                int count = groupHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
+                if (count == 0) {
+                    groupHeartbeatEntityMapper.insert(entity);
+                }
+            }
+        }
+    }
+
+    /**
+     * update Stream Status Data
+     * @param request request
+     */
+    private void updateStreamHeartBeatData(HeartbeatReportRequest request) {
+        List<StreamHeartbeat> list = request.getStreamHeartbeats();
+        if (list != null) {
+            for (StreamHeartbeat info : list) {
+                StreamHeartbeatEntityWithBLOBs entity = new StreamHeartbeatEntityWithBLOBs();
+                entity.setComponent(request.getComponent());
+                entity.setInstance(request.getInstance());
+                entity.setReportTime(new Date(request.getReportTimestamp()));
+                entity.setInlongGroupId(info.getInlongGroupId());
+                entity.setInlongStreamId(info.getInlongStreamId());
+                entity.setMetricHeartbeat(info.getMetricHeartbeat());
+                entity.setStatusHeartbeat(info.getStatusHeartbeat());
+                int count = streamHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
+                if (count == 0) {
+                    streamHeartbeatEntityMapper.insert(entity);
+                }
+            }
+        }
+    }
+
+    /**
+     * get heartbeat info
+     * @param component componentTypeName
+     * @param instance instanceIdentifier
+     * @return heartbeat info
+     */
+    @Override
+    public ComponentHeartbeatResponse getComponentHeartbeatInfo(String component,
+            String instance) {
+        if (component != null && StringUtils.isNotEmpty(component)) {
+            ComponentTypeEnum componentType =
+                    ComponentTypeEnum.valueOf(component);
+            switch (componentType) {
+                case Sort:
+                case DataProxy:
+                case Agent:
+                case Cache:
+                default:
+                    return getComponentHeartbeatInfoByDefaultWay(component, instance);
+            }
+        } else {
+            LOGGER.warn("request is null or component type is null");
+        }
+        return null;
+    }
+
+    /**
+     * get heartbeat static info
+     * @param component componentTypeName
+     * @param instance instanceIdentifier
+     * @param inlongGroupId inlongGroupId
+     * @return heartbeatStaticInfoResponse
+     */
+    @Override
+    public GroupHeartbeatResponse getGroupHeartbeatInfo(String component,
+            String instance, String inlongGroupId) {
+        if (component != null && StringUtils.isNotEmpty(component)) {
+            ComponentTypeEnum componentType =
+                    ComponentTypeEnum.valueOf(component);
+            switch (componentType) {
+                case Sort:
+                case DataProxy:
+                case Agent:
+                case Cache:
+                default:
+                    return getGroupHeartbeatByDefaultWay(component, instance, inlongGroupId);
+            }
+        } else {
+            LOGGER.warn("request is null or component type is null");
+        }
+        return null;
+    }
+
+    /**
+     * get heartbeat status info
+     * @param component componentTypeName
+     * @param instance instanceIdentifier
+     * @param inlongGroupId inlongGroupId
+     * @param inlongStreamId inlongStreamId
+     * @return heartbeatStatusInfoResponse
+     */
+    @Override
+    public StreamHeartbeatResponse getStreamHeartbeatInfo(String component,
+            String instance, String inlongGroupId, String inlongStreamId) {
+        if (component != null && StringUtils.isNotEmpty(component)) {
+            ComponentTypeEnum componentType =
+                    ComponentTypeEnum.valueOf(component);
+            switch (componentType) {
+                case Sort:
+                case DataProxy:
+                case Agent:
+                case Cache:
+                default:
+                    return getStreamHeartbeatByDefaultWay(component, instance,
+                            inlongGroupId, inlongStreamId);
+            }
+        } else {
+            LOGGER.warn("request is null or component type is null");
+        }
+        return null;
+    }
+
+    /**
+     * get component heartbeat infos
+     * @param component  component
+     * @param pageNum pageNum
+     * @param pageSize pageSize
+     * @return pageInfos
+     */
+    @Override
+    public PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfos(String component,
+            int pageNum, int pageSize) {
+        if (component != null && StringUtils.isNotEmpty(component)) {
+            ComponentTypeEnum componentType =
+                    ComponentTypeEnum.valueOf(component);
+            switch (componentType) {
+                case Sort:
+                case DataProxy:
+                case Agent:
+                case Cache:
+                default:
+                    return getComponentHeartbeatInfosByDefaultWay(component, pageNum, pageSize);
+            }
+        } else {
+            LOGGER.warn("request is null or component type is null");
+        }
+        return null;
+    }
+
+    /**
+     * get group heartbeat infos
+     * @param component component
+     * @param instance instance
+     * @param pageNum pageNum
+     * @param pageSize pageSize
+     * @return pageInfo
+     */
+    @Override
+    public PageInfo<GroupHeartbeatResponse> getGroupHeartbeatInfos(String component,
+            String instance, int pageNum, int pageSize) {
+        if (component != null && StringUtils.isNotEmpty(component)) {
+            ComponentTypeEnum componentType =
+                    ComponentTypeEnum.valueOf(component);
+            switch (componentType) {
+                case Sort:
+                case DataProxy:
+                case Agent:
+                case Cache:
+                default:
+                    return getGroupHeartbeatsByDefaultWay(component, instance,
+                            pageNum, pageSize);
+            }
+        } else {
+            LOGGER.warn("request is null or component type is null");
+        }
+        return null;
+    }
+
+    /**
+     * get stream heartbeat infos
+     * @param component component
+     * @param instance instance
+     * @param inlongGroupId inlongGroupId
+     * @param pageNum pageNum
+     * @param pageSize pageSize
+     * @return pageInfo
+     */
+    @Override
+    public PageInfo<StreamHeartbeatResponse> getStreamHeartbeatInfos(String component,
+            String instance, String inlongGroupId, int pageNum, int pageSize) {
+        if (component != null && StringUtils.isNotEmpty(component)) {
+            ComponentTypeEnum componentType =
+                    ComponentTypeEnum.valueOf(component);
+            switch (componentType) {
+                case Sort:
+                case DataProxy:
+                case Agent:
+                case Cache:
+                default:
+                    return getStreamHeartbeatsByDefaultWay(component, instance,
+                            inlongGroupId, pageNum, pageSize);
+            }
+        } else {
+            LOGGER.warn("request is null or component type is null");
+        }
+        return null;
+    }
+
+    /**
+     * update By DefaultWay
+     * @param request request
+     */
+    private void updateByDefaultWay(HeartbeatReportRequest request) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("InlongHeartbeatReportRequest json = {}", gson.toJson(request));
+        }
+        updateComponentHeartbeatData(request);
+        updateGroupHeartbeatData(request);
+        updateStreamHeartBeatData(request);
+    }
+
+    /**
+     * get Heartbeat InfoByDefaultWay
+     * @param component component
+     * @param instance instance
+     * @return HeartbeatInfoResponse
+     */
+    private ComponentHeartbeatResponse getComponentHeartbeatInfoByDefaultWay(String component,
+            String instance) {
+        ComponentHeartbeatEntityWithBLOBs result =
+                componentHeartbeatEntityMapper.selectByKey(component, instance);
+        ComponentHeartbeatResponse componentHeartBeatResponse = null;
+        if (result != null) {
+            componentHeartBeatResponse = new ComponentHeartbeatResponse();
+            componentHeartBeatResponse.setComponent(result.getComponent());
+            componentHeartBeatResponse.setInstance(result.getInstance());
+            componentHeartBeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
+            componentHeartBeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
+            componentHeartBeatResponse.setReportTime(result.getReportTime().getTime());
+        }
+        return componentHeartBeatResponse;
+    }
+
+    /**
+     * get heartbeat StaticInfo ByDefaultWay
+     * @param component component
+     * @param instance instance
+     * @param inlongGroupId inlongGroupId
+     * @return heartbeatStaticInfoResponse
+     */
+    private GroupHeartbeatResponse getGroupHeartbeatByDefaultWay(String component,
+            String instance, String inlongGroupId) {
+
+        GroupHeartbeatEntityWithBLOBs result =
+                groupHeartbeatEntityMapper.selectByKey(component,
+                        instance, inlongGroupId);
+        GroupHeartbeatResponse groupHeartbeatResponse = null;
+        if (result != null) {
+            groupHeartbeatResponse = new GroupHeartbeatResponse();
+            groupHeartbeatResponse.setInlongGroupId(inlongGroupId);
+            groupHeartbeatResponse.setComponent(result.getComponent());
+            groupHeartbeatResponse.setInstance(result.getInstance());
+            groupHeartbeatResponse.setReportTime(result.getReportTime().getTime());
+            groupHeartbeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
+            groupHeartbeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
+        }
+        return groupHeartbeatResponse;
+    }
+
+    /**
+     * get Heartbeat StatusInfo ByDefaultWay
+     * @param componentTypeName componentTypeName
+     * @param instanceIdentifier instanceIdentifier
+     * @param inlongGroupId inlongGroupId
+     * @return heartbeatStatusInfoResponse
+     */
+    private StreamHeartbeatResponse getStreamHeartbeatByDefaultWay(String componentTypeName,
+            String instanceIdentifier, String inlongGroupId, String inlongStreamId) {
+        StreamHeartbeatEntityWithBLOBs result =
+                streamHeartbeatEntityMapper.selectByKey(componentTypeName,
+                        instanceIdentifier, inlongGroupId, inlongStreamId);
+        StreamHeartbeatResponse streamHeartBeatResponse = null;
+        if (result != null) {
+            streamHeartBeatResponse = new StreamHeartbeatResponse();
+            streamHeartBeatResponse.setComponent(result.getComponent());
+            streamHeartBeatResponse.setInstance(result.getInstance());
+            streamHeartBeatResponse.setReportTime(result.getReportTime().getTime());
+            streamHeartBeatResponse.setInlongGroupId(result.getInlongGroupId());
+            streamHeartBeatResponse.setInlongStreamId(result.getInlongStreamId());
+            streamHeartBeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
+            streamHeartBeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
+        }
+        return streamHeartBeatResponse;
+    }
+
+    /**
+     * get Heartbeat InfoByDefaultWay
+     * @param component component
+     * @param pageNum pageNum
+     * @param pageSize pageSize
+     * @return HeartbeatInfoResponse
+     */
+    private PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfosByDefaultWay(String component,
+            int pageNum, int pageSize) {
+        Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+        PageHelper.startPage(pageNum, pageSize);
+
+        Page<ComponentHeartbeatEntityWithBLOBs> entityPage = (Page<ComponentHeartbeatEntityWithBLOBs>)
+                componentHeartbeatEntityMapper.selectHeartbeats(component);
+
+        List<ComponentHeartbeatResponse> componentHeartBeatResponses = new ArrayList<>();
+        PageInfo<ComponentHeartbeatResponse> pageInfo;
+        if (entityPage != null) {
+            componentHeartBeatResponses = CommonBeanUtils
+                    .copyListProperties(entityPage, ComponentHeartbeatResponse::new);
+        }
+        pageInfo = new PageInfo<>(componentHeartBeatResponses);
+        pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+        return pageInfo;
+    }
+
+    /**
+     * get heartbeat StaticInfo ByDefaultWay
+     * @param component component
+     * @param instance instance
+     * @param pageNum pageNum
+     * @param pageSize pageSize
+     * @return heartbeatStaticInfoResponse
+     */
+    private PageInfo<GroupHeartbeatResponse> getGroupHeartbeatsByDefaultWay(String component,
+            String instance, int pageNum, int pageSize) {
+        Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+        Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
+        PageHelper.startPage(pageNum, pageSize);
+        Page<GroupHeartbeatEntityWithBLOBs> entityPage = (Page<GroupHeartbeatEntityWithBLOBs>)
+                groupHeartbeatEntityMapper.selectHeartbeats(component, instance);
+        List<GroupHeartbeatResponse>  groupHeartbeatResponses = new ArrayList<>();
+        PageInfo<GroupHeartbeatResponse> pageInfo;
+        if (entityPage != null) {
+            groupHeartbeatResponses = CommonBeanUtils
+                    .copyListProperties(entityPage, GroupHeartbeatResponse::new);
+        }
+        pageInfo = new PageInfo<>(groupHeartbeatResponses);
+        pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+        return pageInfo;
+    }
+
+    /**
+     * get Heartbeat StatusInfo ByDefaultWay
+     * @param component component
+     * @param instance instance
+     * @param inlongGroupId inlongGroupId
+     * @param pageNum pageNum
+     * @param pageSize pageSize
+     * @return heartbeatStatusInfoResponse
+     */
+    private PageInfo<StreamHeartbeatResponse> getStreamHeartbeatsByDefaultWay(String component,
+            String instance, String inlongGroupId, int pageNum, int pageSize) {
+        Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+        Preconditions.checkNotNull(instance, ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
+        Preconditions.checkNotNull(inlongGroupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        PageHelper.startPage(pageNum, pageSize);
+        Page<StreamHeartbeatEntityWithBLOBs> entityPage = (Page<StreamHeartbeatEntityWithBLOBs>)
+                streamHeartbeatEntityMapper.selectHeartbeats(component, instance, inlongGroupId);
+        List<StreamHeartbeatResponse>  streamHeartBeatResponses = new ArrayList<>();
+        PageInfo<StreamHeartbeatResponse> pageInfo;
+        if (entityPage != null) {
+            streamHeartBeatResponses = CommonBeanUtils
+                    .copyListProperties(entityPage, StreamHeartbeatResponse::new);
+        }
+        pageInfo = new PageInfo<>(streamHeartBeatResponses);
+        pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+        return pageInfo;
+    }
+}
diff --git a/inlong-manager/manager-test/pom.xml b/inlong-manager/manager-test/pom.xml
index 935b53192..1feec8fa6 100644
--- a/inlong-manager/manager-test/pom.xml
+++ b/inlong-manager/manager-test/pom.xml
@@ -49,15 +49,16 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-test</artifactId>
         </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-test</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
index a588fabd9..a9b9f3a54 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.test;
 
+import org.junit.runner.RunWith;
 import org.mvnsearch.h2.H2FunctionsLoader;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -25,10 +26,12 @@ import org.springframework.test.context.ActiveProfiles;
 
 import javax.annotation.PostConstruct;
 import javax.sql.DataSource;
+import org.springframework.test.context.junit4.SpringRunner;
 
 @ActiveProfiles(value = {"test"})
 @EnableConfigurationProperties
 @ComponentScan(basePackages = "org.apache.inlong.manager")
+@RunWith(SpringRunner.class)
 public class BaseTest {
 
     @Autowired
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 43075dd08..63ccca42b 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -1061,4 +1061,89 @@ CREATE TABLE `sort_source_config`
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Sort source config table';
 
+
+-- ----------------------------
+-- Table structure for config log report
+-- ----------------------------
+DROP TABLE IF EXISTS `stream_config_log`;
+CREATE TABLE `stream_config_log`
+(
+    `ip`               varchar(24)  NOT NULL COMMENT 'client host ip',
+    `version`          varchar(64)           DEFAULT NULL COMMENT 'client version',
+    `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong stream ID for consumption',
+    `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+    `component_name`   varchar(64)  NOT NULL DEFAULT '' COMMENT 'current report info component name',
+    `config_name`      varchar(64)  NOT NULL DEFAULT '' COMMENT 'massage in heartbeat request',
+    `log_type`         int(1)                DEFAULT 0 COMMENT '0 normal, 1 error',
+    `log_info`         text                  DEFAULT NULL COMMENT 'massage in heartbeat request',
+    `report_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'report time',
+    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`ip`, `config_name`, `component_name`, `log_type`, `inlong_stream_id`, `inlong_group_id`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8 COMMENT ='stream config log report information table';
+
+-- ----------------------------
+-- Table structure for inlong beat heart
+-- ----------------------------
+DROP TABLE IF EXISTS `component_heartbeat`;
+CREATE TABLE `component_heartbeat`
+(
+    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
+    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
+    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component identifier ip or id',
+    `status_heartbeat`        text         NOT NULL COMMENT 'component status info',
+    `metric_heartbeat`        text         NOT NULL COMMENT 'component metric info',
+    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `index_beat_heart` (`component`, `instance`),
+    KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8 COMMENT ='inlong heartbeat';
+
+-- ----------------------------
+-- Table structure for inlong stream status info
+-- ----------------------------
+DROP TABLE IF EXISTS `group_heartbeat`;
+CREATE TABLE `group_heartbeat`
+(
+    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
+    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
+    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component ip or id',
+    `inlong_group_id`         varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+    `status_heartbeat`        text         NOT NULL COMMENT 'Inlong group status info',
+    `metric_heartbeat`        text         NOT NULL COMMENT 'Inlong group metric info',
+    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`),
+    KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8 COMMENT ='inlong group heartbeat';
+
+-- ----------------------------
+-- Table structure for inlong component static info
+-- ----------------------------
+DROP TABLE IF EXISTS `stream_heartbeat`;
+CREATE TABLE `stream_heartbeat`
+(
+    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
+    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
+    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component ip or id',
+    `inlong_group_id`         varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+    `inlong_stream_id`        varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong metric id',
+    `status_heartbeat`        text         NOT NULL COMMENT 'status info',
+    `metric_heartbeat`        text         NOT NULL COMMENT 'static info',
+    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`),
+    KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8 COMMENT ='inlong stream heartbeat';
+-- ----------------------------
+
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index ac4ad652c..f99f2bc87 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1133,5 +1133,67 @@ CREATE TABLE `stream_config_log`
   DEFAULT CHARSET = utf8 COMMENT ='stream config log report information table';
 
 -- ----------------------------
+-- Table structure for inlong beat heart
+-- ----------------------------
+DROP TABLE IF EXISTS `component_heartbeat`;
+CREATE TABLE `component_heartbeat`
+(
+    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
+    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
+    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component identifier ip or id',
+    `status_heartbeat`        text         NOT NULL COMMENT 'component status info',
+    `metric_heartbeat`        text         NOT NULL COMMENT 'component metric info',
+    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `index_beat_heart` (`component`, `instance`),
+    KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8 COMMENT ='inlong heartbeat';
+
+-- ----------------------------
+-- Table structure for inlong stream status info
+-- ----------------------------
+DROP TABLE IF EXISTS `group_heartbeat`;
+CREATE TABLE `group_heartbeat`
+(
+    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
+    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
+    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component ip or id',
+    `inlong_group_id`         varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+    `status_heartbeat`        text         NOT NULL COMMENT 'Inlong group status info',
+    `metric_heartbeat`        text         NOT NULL COMMENT 'Inlong group metric info',
+    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`),
+    KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8 COMMENT ='inlong group heartbeat';
+
+-- ----------------------------
+-- Table structure for inlong component static info
+-- ----------------------------
+DROP TABLE IF EXISTS `stream_heartbeat`;
+CREATE TABLE `stream_heartbeat`
+(
+    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
+    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
+    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component ip or id',
+    `inlong_group_id`         varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+    `inlong_stream_id`        varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong metric id',
+    `status_heartbeat`        text         NOT NULL COMMENT 'status info',
+    `metric_heartbeat`        text         NOT NULL COMMENT 'static info',
+    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`),
+    KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8 COMMENT ='inlong stream heartbeat';
+-- ----------------------------
 
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java
new file mode 100644
index 000000000..4087499c2
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import com.github.pagehelper.PageInfo;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/heartbeat/")
+@Api(tags = "Heartbeat")
+public class HeartbeatInfoController {
+
+    @Autowired
+    private HeartbeatService heartbeatService;
+
+    @RequestMapping(value = "/component/info", method = RequestMethod.POST)
+    @ApiOperation(value = "query component heartbeat")
+    public Response<ComponentHeartbeatResponse> queryComponentHeartbeatInfo(@RequestBody
+            ComponentHeartbeatRequest info) {
+        ComponentHeartbeatResponse response =
+                heartbeatService.getComponentHeartbeatInfo(info.getComponent(),
+                info.getInstance());
+        if (response == null) {
+            return Response.fail("Not found msg!");
+        } else {
+            return Response.success(response);
+        }
+    }
+
+    @RequestMapping(value = "/group/info", method = RequestMethod.POST)
+    @ApiOperation(value = "query group heartbeat")
+    public Response<GroupHeartbeatResponse> queryGroupHeartbeatInfo(@RequestBody
+            GroupHeartbeatRequest info) {
+        GroupHeartbeatResponse response = heartbeatService
+                .getGroupHeartbeatInfo(info.getComponent(), info.getInstance(),
+                        info.getInlongGroupId());
+        if (response == null) {
+            return Response.fail("Not found msg!");
+        } else {
+            return Response.success(response);
+        }
+    }
+
+    @RequestMapping(value = "/stream/info", method = RequestMethod.POST)
+    @ApiOperation(value = "query stream heartbeat")
+    public Response<StreamHeartbeatResponse> queryStreamHeartbeat(@RequestBody
+            StreamHeartbeatRequest info) {
+        StreamHeartbeatResponse response = heartbeatService
+                .getStreamHeartbeatInfo(info.getComponent(),
+                info.getInstance(), info.getInlongGroupId(), info.getInlongStreamId());
+        if (response == null) {
+            return Response.fail("Not found msg!");
+        } else {
+            return Response.success(response);
+        }
+    }
+
+    @RequestMapping(value = "/component/list", method = RequestMethod.POST)
+    @ApiOperation(value = "query component heartbeats")
+    public Response<PageInfo<ComponentHeartbeatResponse>> queryComponentHeartbeatInfos(@RequestBody
+            ComponentHeartbeatPageRequest info) {
+        PageInfo<ComponentHeartbeatResponse> responses =
+                heartbeatService.getComponentHeartbeatInfos(info.getComponent(), info.getPageNum(),
+                        info.getPageSize());
+        if (responses == null) {
+            return Response.fail("Not found msg!");
+        } else {
+            return Response.success(responses);
+        }
+    }
+
+    @RequestMapping(value = "/group/list", method = RequestMethod.POST)
+    @ApiOperation(value = "query group heartbeats")
+    public Response<PageInfo<GroupHeartbeatResponse>> queryGroupHeartbeatInfos(@RequestBody
+            GroupHeartbeatPageRequest info) {
+        PageInfo<GroupHeartbeatResponse> responses = heartbeatService
+                .getGroupHeartbeatInfos(info.getComponent(), info.getInstance(),
+                        info.getPageNum(), info.getPageSize());
+        if (responses == null) {
+            return Response.fail("Not found msg!");
+        } else {
+            return Response.success(responses);
+        }
+    }
+
+    @RequestMapping(value = "/stream/list", method = RequestMethod.POST)
+    @ApiOperation(value = "query stream heartbeats")
+    public Response<PageInfo<StreamHeartbeatResponse>> queryStreamHeartbeats(@RequestBody
+            StreamHeartbeatPageRequest info) {
+        PageInfo<StreamHeartbeatResponse> responses = heartbeatService
+                .getStreamHeartbeatInfos(info.getComponent(), info.getInstance(),
+                        info.getInlongGroupId(), info.getPageNum(), info.getPageSize());
+        if (responses == null) {
+            return Response.fail("Not found msg!");
+        } else {
+            return Response.success(responses);
+        }
+    }
+
+}
+
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java
new file mode 100644
index 000000000..ea6cd0e42
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller.openapi;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/openapi/heartbeat")
+@Api(tags = "Heartbeat")
+public class HeartbeatController {
+
+    @Autowired
+    private HeartbeatService heartBeatService;
+
+    @PostMapping(value = "/report",
+            produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
+    @ApiOperation(value = "heartbeat report")
+    public Response<String> reportHeartbeat(@RequestBody HeartbeatReportRequest info) {
+        return Response.success(heartBeatService.reportHeartbeatInfo(info));
+    }
+}
+
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java
new file mode 100644
index 000000000..7ed34b8b0
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller.openapi;
+
+import static org.mockito.BDDMockito.given;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.apache.inlong.manager.service.core.impl.HeartbeatServiceImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+public class HeartbeatControllerTest {
+
+    @InjectMocks
+    private HeartbeatService heartbeatService = new HeartbeatServiceImpl();
+
+    @Mock
+    private ComponentHeartbeatEntityMapper componentHeartbeatEntityMapper;
+
+    @Mock
+    private GroupHeartbeatEntityMapper groupHeartbeatEntityMapper;
+
+    @Mock
+    private StreamHeartbeatEntityMapper streamHeartbeatEntityMapper;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.openMocks(this);
+
+        ComponentHeartbeatEntityWithBLOBs componentHeartbeatEntityWithBLOBs =
+                new ComponentHeartbeatEntityWithBLOBs();
+        componentHeartbeatEntityWithBLOBs.setComponent("Sort");
+        componentHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
+        componentHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"inlongGroupId\":\"groupId\","
+                + "\"componentStaticInfo\":\"\"}]");
+        componentHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"inlongGroupId\":\"groupId\","
+                + "\"streamStatusInfo\":\"\"}]");
+        componentHeartbeatEntityWithBLOBs.setReportTime(new Date());
+        Page<ComponentHeartbeatEntityWithBLOBs> componentPage = new Page<>();
+        componentPage.add(componentHeartbeatEntityWithBLOBs);
+        componentPage.setTotal(1);
+        given(componentHeartbeatEntityMapper.insert(new ComponentHeartbeatEntityWithBLOBs())).willReturn(1);
+        given(componentHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
+                Mockito.anyString())).willReturn(componentHeartbeatEntityWithBLOBs);
+        given(componentHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString()))
+                .willReturn(componentPage);
+
+        GroupHeartbeatEntityWithBLOBs groupHeartbeatEntityWithBLOBs =
+                new GroupHeartbeatEntityWithBLOBs();
+        groupHeartbeatEntityWithBLOBs.setComponent("Sort");
+        groupHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
+        groupHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+                + ": \"10\"},"
+                + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+        groupHeartbeatEntityWithBLOBs.setReportTime(new Date());
+        groupHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+                + ": \"10\"},"
+                + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+        Page<GroupHeartbeatEntityWithBLOBs> groupPage = new Page<>();
+        groupPage.add(groupHeartbeatEntityWithBLOBs);
+        groupPage.setTotal(1);
+        given(groupHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
+                Mockito.anyString(), Mockito.anyString())).willReturn(groupHeartbeatEntityWithBLOBs);
+        given(groupHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString(),
+                Mockito.anyString())).willReturn(groupPage);
+
+        StreamHeartbeatEntityWithBLOBs streamHeartbeatEntityWithBLOBs = new StreamHeartbeatEntityWithBLOBs();
+        streamHeartbeatEntityWithBLOBs.setComponent("Sort");
+        streamHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
+        streamHeartbeatEntityWithBLOBs.setInlongGroupId("group1");
+        streamHeartbeatEntityWithBLOBs.setInlongStreamId("test_test");
+        streamHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"statue\":\"running\"}]");
+        streamHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"outMsg\":\"1\",\"inMsg\":2}]");
+        streamHeartbeatEntityWithBLOBs.setReportTime(new Date());
+
+        Page<StreamHeartbeatEntityWithBLOBs> streamPage = new Page<>();
+        streamPage.add(streamHeartbeatEntityWithBLOBs);
+        streamPage.setTotal(1);
+
+        given(streamHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
+                Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+                .willReturn(streamHeartbeatEntityWithBLOBs);
+        given(streamHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString(),
+                Mockito.anyString(), Mockito.anyString()))
+                .willReturn(streamPage);
+    }
+
+    @Test
+    public void testAddHeartbeat() throws Exception {
+
+        HeartbeatReportRequest request = new HeartbeatReportRequest();
+        request.setComponent("Sort");
+        request.setInstance("127.0.0.1");
+        request.setReportTimestamp(Instant.now().toEpochMilli());
+
+        ComponentHeartbeat componentHeartbeat = new ComponentHeartbeat();
+        componentHeartbeat.setMetricHeartbeat("{\"mem\":\"100\"}");
+        componentHeartbeat.setStatusHeartbeat("{\"runningTime\":\"10h.35m\","
+                + "\"status\":\"10h.35m\","
+                + "\"groupIds\":\"group1,group2\"}");
+
+        List<GroupHeartbeat> groupHeartbeats = new ArrayList<>();
+        GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
+        groupHeartbeat.setInlongGroupId("group1");
+        groupHeartbeat.setStatusHeartbeat("[{\"status\":\"running\",\"streamIds\":\"1,2,3,4\"}]");
+        request.setGroupHeartbeats(groupHeartbeats);
+
+        StreamHeartbeat streamHeartbeat = new StreamHeartbeat();
+        streamHeartbeat.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+                + ": \"10\"},"
+                + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+        streamHeartbeat.setStatusHeartbeat("{}");
+        streamHeartbeat.setInlongGroupId("group1");
+        streamHeartbeat.setInlongStreamId("1");
+        List<StreamHeartbeat> streamHeartbeats = new ArrayList<>();
+        streamHeartbeats.add(streamHeartbeat);
+        request.setStreamHeartbeats(streamHeartbeats);
+
+        Assert.assertEquals("Success", heartbeatService.reportHeartbeatInfo(request));
+    }
+
+    @Test
+    public void testQueryComponentHeartbeat() throws Exception {
+        ComponentHeartbeatResponse response
+                = heartbeatService.getComponentHeartbeatInfo("Sort", "127.0.0.1");
+        Assert.assertEquals("127.0.0.1", response.getInstance());
+    }
+
+    @Test
+    public void testQueryGroupHeartbeat() throws Exception {
+        GroupHeartbeatResponse response
+                = heartbeatService.getGroupHeartbeatInfo("Sort",
+                "127.0.0.1", "group1");
+        Assert.assertEquals("127.0.0.1", response.getInstance());
+    }
+
+    @Test
+    public void testQueryStreamHeartbeat() throws Exception {
+        StreamHeartbeatResponse response =
+                heartbeatService.getStreamHeartbeatInfo("Sort",
+                        "127.0.0.1", "group1", "stream1");
+        Assert.assertEquals("127.0.0.1", response.getInstance());
+    }
+
+    @Test
+    public void testQueryComponentHeartbeatPage() throws Exception {
+        PageInfo<ComponentHeartbeatResponse> pageResponse
+                = heartbeatService.getComponentHeartbeatInfos("Sort", 1,
+                10);
+        Assert.assertEquals(1, pageResponse.getTotal());
+    }
+
+    @Test
+    public void testQueryGroupHeartbeatPage() throws Exception {
+        PageInfo<GroupHeartbeatResponse> pageResponse
+                = heartbeatService.getGroupHeartbeatInfos("Sort",
+                "127.0.0.1", 1, 10);
+        Assert.assertEquals(1, pageResponse.getTotal());
+    }
+
+    @Test
+    public void testQueryStreamHeartbeatPage() throws Exception {
+        PageInfo<StreamHeartbeatResponse> pageResponse =
+                heartbeatService.getStreamHeartbeatInfos("Sort",
+                        "127.0.0.1", "group1", 1, 10);
+        Assert.assertEquals(1, pageResponse.getTotal());
+    }
+}
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index 4c3e6b0e1..7ae509f9f 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -127,6 +127,13 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
                 <version>${plugin.surefire.version}</version>
+<!--                <dependencies>-->
+<!--                    <dependency>-->
+<!--                        <groupId>org.apache.maven.surefire</groupId>-->
+<!--                        <artifactId>surefire-junit47</artifactId>-->
+<!--                        <version>${plugin.surefire.version}</version>-->
+<!--                    </dependency>-->
+<!--                </dependencies>-->
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>