You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/28 11:39:37 UTC

[incubator-inlong] 06/07: [INLONG-4000][Manager] Refactor the implementations of heartbeat interfaces (#4002)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 05820029e108c37ac951e26e81b71bf6668ef07d
Author: healchow <he...@gmail.com>
AuthorDate: Thu Apr 28 15:23:30 2022 +0800

    [INLONG-4000][Manager] Refactor the implementations of heartbeat interfaces (#4002)
    
    * [INLONG-4000][Manager] Refactor the implementations of heartbeat interfaces
    
    * [INLONG-4000][Manager] Remove unused imports
---
 .../inlong/common/enums/ComponentTypeEnum.java     |  10 +-
 .../common/pojo/heartbeat/ComponentHeartbeat.java  |  15 +-
 .../heartbeat/ComponentHeartbeatPageRequest.java   |  36 --
 .../pojo/heartbeat/ComponentHeartbeatRequest.java  |  37 --
 .../pojo/heartbeat/ComponentHeartbeatResponse.java |  30 +-
 .../common/pojo/heartbeat/GroupHeartbeat.java      |  16 +-
 .../pojo/heartbeat/GroupHeartbeatPageRequest.java  |  38 --
 .../pojo/heartbeat/GroupHeartbeatRequest.java      |  40 --
 .../pojo/heartbeat/GroupHeartbeatResponse.java     |  33 +-
 ...tPageRequest.java => HeartbeatPageRequest.java} |  24 +-
 ...beatRequest.java => HeartbeatQueryRequest.java} |  20 +-
 .../pojo/heartbeat/HeartbeatReportRequest.java     |  34 +-
 .../common/pojo/heartbeat/StreamHeartbeat.java     |  17 +-
 .../pojo/heartbeat/StreamHeartbeatResponse.java    |  35 +-
 .../dao/entity/ComponentHeartbeatEntity.java       |  19 +-
 .../entity/ComponentHeartbeatEntityWithBLOBs.java  |  30 --
 .../manager/dao/entity/GroupHeartbeatEntity.java   |  18 +-
 .../dao/entity/GroupHeartbeatEntityWithBLOBs.java  |  31 --
 .../manager/dao/entity/StreamHeartbeatEntity.java  |  21 +-
 .../dao/entity/StreamHeartbeatEntityWithBLOBs.java |  30 --
 .../dao/mapper/ComponentHeartbeatEntityMapper.java |  23 +-
 .../dao/mapper/GroupHeartbeatEntityMapper.java     |  26 +-
 .../dao/mapper/StreamHeartbeatEntityMapper.java    |  30 +-
 .../mappers/ComponentHeartbeatEntityMapper.xml     | 130 ++---
 .../mappers/GroupHeartbeatEntityMapper.xml         | 148 +++---
 .../mappers/StreamHeartbeatEntityMapper.xml        | 156 +++---
 .../manager/service/core/HeartbeatService.java     |  84 ++-
 .../service/core/impl/HeartbeatServiceImpl.java    | 576 +++++++--------------
 .../service/core/impl/HeartbeatServiceTest.java    | 213 ++++++++
 .../main/resources/sql/apache_inlong_manager.sql   |  95 ++--
 .../manager-web/sql/apache_inlong_manager.sql      |  76 ++-
 .../web/controller/HeartbeatController.java        |  82 +++
 .../web/controller/HeartbeatInfoController.java    | 133 -----
 ...ontroller.java => OpenHeartbeatController.java} |  15 +-
 .../openapi/HeartbeatControllerTest.java           | 210 --------
 35 files changed, 1055 insertions(+), 1476 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
index 3361ea99a..9a2330c8f 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
@@ -21,16 +21,16 @@ import lombok.Getter;
 
 public enum ComponentTypeEnum {
 
-    Agent("Inlong-Agent"),
+    Agent("Agent"),
 
-    DataProxy("Inlong-DataProxy"),
+    DataProxy("DataProxy"),
 
-    Cache("Inlong-Cache"),
+    Cache("Cache"),
 
-    Sort("Inlong-Sort");
+    Sort("Sort");
 
     @Getter
-    private String name;
+    private final String name;
 
     ComponentTypeEnum(String name) {
         this.name = name;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
index 61cc41911..7b9510877 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
@@ -17,14 +17,21 @@
 
 package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import lombok.Getter;
-import lombok.Setter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
 
-@Getter
-@Setter
+/**
+ * Component heartbeat info
+ */
+@Data
+@ApiModel("Component heartbeat info")
 public class ComponentHeartbeat {
 
+    @ApiModelProperty(value = "Status heartbeat info")
     private String statusHeartbeat;
 
+    @ApiModelProperty(value = "Metric heartbeat info")
     private String metricHeartbeat;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java
deleted file mode 100644
index 06d95756f..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.heartbeat;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.inlong.manager.common.beans.PageRequest;
-
-@Getter
-@Setter
-@Data
-@ApiModel("Inlong component heartbeats query request")
-public class ComponentHeartbeatPageRequest
-        extends PageRequest {
-
-    @ApiModelProperty
-    private String component;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java
deleted file mode 100644
index bb2dd27c4..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.heartbeat;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-
-@Getter
-@Setter
-@Data
-@ApiModel("Inlong component heartbeat query request")
-public class ComponentHeartbeatRequest {
-
-    @ApiModelProperty
-    private String component;
-
-    @ApiModelProperty
-    private String instance;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
index 68e944a7e..d70976512 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
@@ -17,30 +17,36 @@
 
 package org.apache.inlong.manager.common.pojo.heartbeat;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
 
-@Getter
-@Setter
+import java.util.Date;
+
+/**
+ * Component heartbeat response
+ */
 @Data
-@ApiModel("Inlong component heartbeat query response")
+@ApiModel("Component heartbeat response")
 public class ComponentHeartbeatResponse {
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Component name, such as: Agent, Sort...")
     private String component;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Component instance, can be ip, name...")
     private String instance;
 
-    @ApiModelProperty
-    private long reportTime;
-
-    @ApiModelProperty
+    @ApiModelProperty(value = "Stream status heartbeat")
     private String statusHeartbeat;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Stream metric heartbeat")
     private String metricHeartbeat;
+
+    @ApiModelProperty(value = "Report time of heartbeat")
+    private Long reportTime;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date modifyTime;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
index f2e81f9ed..ab96946ea 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
@@ -17,16 +17,24 @@
 
 package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import lombok.Getter;
-import lombok.Setter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
 
-@Getter
-@Setter
+/**
+ * Inlong group heartbeat info
+ */
+@Data
+@ApiModel("Inlong group heartbeat info")
 public class GroupHeartbeat {
 
+    @ApiModelProperty(value = "Inlong group id")
     private String inlongGroupId;
 
+    @ApiModelProperty(value = "Status heartbeat info")
     private String statusHeartbeat;
 
+    @ApiModelProperty(value = "Metric heartbeat info")
     private String metricHeartbeat;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java
deleted file mode 100644
index 23e72f3d0..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.heartbeat;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.inlong.manager.common.beans.PageRequest;
-
-@Data
-@Getter
-@Setter
-@ApiModel("Inlong group heartbeats request")
-public class GroupHeartbeatPageRequest extends PageRequest {
-
-    @ApiModelProperty
-    private String component;
-
-    @ApiModelProperty
-    private String instance;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java
deleted file mode 100644
index 79ec29a78..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.heartbeat;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-
-@Data
-@Getter
-@Setter
-@ApiModel("Inlong group heartbeat request")
-public class GroupHeartbeatRequest {
-
-    @ApiModelProperty
-    private String component;
-
-    @ApiModelProperty
-    private String instance;
-
-    @ApiModelProperty
-    private String inlongGroupId;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
index ed3ad688e..c5552ab82 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
@@ -17,33 +17,42 @@
 
 package org.apache.inlong.manager.common.pojo.heartbeat;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
 
+import java.util.Date;
+
+/**
+ * Inlong group heartbeat response
+ */
 @Data
-@Getter
-@Setter
 @ApiModel("Inlong group heartbeat response")
 public class GroupHeartbeatResponse {
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Primary key")
+    private Integer id;
+
+    @ApiModelProperty(value = "Component name, such as: Agent, Sort...")
     private String component;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Component instance, can be ip, name...")
     private String instance;
 
-    @ApiModelProperty
-    private long reportTime;
-
-    @ApiModelProperty
+    @ApiModelProperty(value = "Inlong group id")
     private String inlongGroupId;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Stream status heartbeat")
     private String statusHeartbeat;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Stream metric heartbeat")
     private String metricHeartbeat;
+
+    @ApiModelProperty(value = "Report time of heartbeat")
+    private Long reportTime;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date modifyTime;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatPageRequest.java
similarity index 69%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatPageRequest.java
index 08ead5382..ad7f4e865 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatPageRequest.java
@@ -20,23 +20,27 @@ package org.apache.inlong.manager.common.pojo.heartbeat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
+import lombok.EqualsAndHashCode;
 import org.apache.inlong.manager.common.beans.PageRequest;
 
-@Getter
-@Setter
+/**
+ * Base heartbeat page request
+ */
 @Data
-@ApiModel("Inlong stream heartbeats query request")
-public class StreamHeartbeatPageRequest
-        extends PageRequest {
+@EqualsAndHashCode(callSuper = false)
+@ApiModel("Base heartbeat page request")
+public class HeartbeatPageRequest extends PageRequest {
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Component name, such as: Agent, Sort...")
     private String component;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Component instance, can be ip, name...")
     private String instance;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Inlong group id")
     private String inlongGroupId;
+
+    @ApiModelProperty(value = "Inlong stream id")
+    private String inlongStreamId;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatQueryRequest.java
similarity index 75%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatQueryRequest.java
index c5420a0b1..16afdeae5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatQueryRequest.java
@@ -20,24 +20,24 @@ package org.apache.inlong.manager.common.pojo.heartbeat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
 
-@Getter
-@Setter
+/**
+ * Heartbeat query request
+ */
 @Data
-@ApiModel("Inlong stream heartbeat query request")
-public class StreamHeartbeatRequest {
+@ApiModel("Heartbeat query request")
+public class HeartbeatQueryRequest {
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Component name, such as: Agent, Sort...")
     private String component;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Component instance, can be ip, name...")
     private String instance;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Inlong group id")
     private String inlongGroupId;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Inlong stream id")
     private String inlongStreamId;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
index 7f9bed089..cdd262439 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
@@ -17,32 +17,38 @@
 
 package org.apache.inlong.manager.common.pojo.heartbeat;
 
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import javax.validation.constraints.NotBlank;
 import java.util.List;
-import lombok.Getter;
-import lombok.Setter;
 
-@Getter
-@Setter
+/**
+ * Request of heartbeat report
+ */
+@Data
+@ApiModel("Request of heartbeat report")
 public class HeartbeatReportRequest {
 
+    @NotBlank
+    @ApiModelProperty(value = "Component name, such as: Agent, Sort...", required = true)
     private String component;
 
+    @NotBlank
+    @ApiModelProperty(value = "Component instance, can be ip, name...", required = true)
     private String instance;
 
-    private long reportTimestamp;
+    @ApiModelProperty(value = "Report timestamp", required = true)
+    private Long reportTime;
 
-    /*
-     * component
-     */
+    @ApiModelProperty(value = "Component heartbeat info")
     private ComponentHeartbeat componentHeartbeat;
 
-    /*
-     * group
-     */
+    @ApiModelProperty(value = "Group heartbeat list")
     private List<GroupHeartbeat> groupHeartbeats;
 
-    /*
-     * stream
-     */
+    @ApiModelProperty(value = "Stream heartbeat list")
     private List<StreamHeartbeat> streamHeartbeats;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
index 391c62221..3f0b616f3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
@@ -17,18 +17,27 @@
 
 package org.apache.inlong.manager.common.pojo.heartbeat;
 
-import lombok.Getter;
-import lombok.Setter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
 
-@Getter
-@Setter
+/**
+ * Inlong stream heartbeat info
+ */
+@Data
+@ApiModel("Inlong stream heartbeat info")
 public class StreamHeartbeat {
 
+    @ApiModelProperty(value = "Inlong group id")
     private String inlongGroupId;
 
+    @ApiModelProperty(value = "Inlong stream id")
     private String inlongStreamId;
 
+    @ApiModelProperty(value = "Status heartbeat info")
     private String statusHeartbeat;
 
+    @ApiModelProperty(value = "Metric heartbeat info")
     private String metricHeartbeat;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
index 4b808f090..4f2cbb31c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
@@ -17,36 +17,45 @@
 
 package org.apache.inlong.manager.common.pojo.heartbeat;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
 
-@Getter
-@Setter
+import java.util.Date;
+
+/**
+ * Inlong stream heartbeat response
+ */
 @Data
 @ApiModel("Inlong stream heartbeat response")
 public class StreamHeartbeatResponse {
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Primary key")
+    private Integer id;
+
+    @ApiModelProperty(value = "Component name, such as: Agent, Sort...")
     private String component;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Component instance, can be ip, name...")
     private String instance;
 
-    @ApiModelProperty
-    private long reportTime;
-
-    @ApiModelProperty
+    @ApiModelProperty(value = "Inlong group id")
     private String inlongGroupId;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Inlong stream id")
     private String inlongStreamId;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Stream status heartbeat")
     private String statusHeartbeat;
 
-    @ApiModelProperty
+    @ApiModelProperty(value = "Stream metric heartbeat")
     private String metricHeartbeat;
+
+    @ApiModelProperty(value = "Report time of heartbeat")
+    private Long reportTime;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date modifyTime;
+
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
index b2d8ccca5..b5406d1ec 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
@@ -17,23 +17,22 @@
 
 package org.apache.inlong.manager.dao.entity;
 
+import lombok.Data;
+
 import java.io.Serializable;
 import java.util.Date;
-import lombok.Data;
 
 @Data
 public class ComponentHeartbeatEntity implements Serializable {
-    private Integer id;
 
+    private static final long serialVersionUID = 1L;
+    private Integer id;
     private String component;
-
     private String instance;
-
-    private Date reportTime;
-
-    private Date modifyTime;
-
+    private String statusHeartbeat;
+    private String metricHeartbeat;
+    private Long reportTime;
     private Date createTime;
+    private Date modifyTime;
 
-    private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java
deleted file mode 100644
index a9eaa8691..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import java.io.Serializable;
-import lombok.Data;
-
-@Data
-public class ComponentHeartbeatEntityWithBLOBs extends ComponentHeartbeatEntity implements Serializable {
-    private String statusHeartbeat;
-
-    private String metricHeartbeat;
-
-    private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
index 230202497..c054cef03 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
@@ -17,25 +17,23 @@
 
 package org.apache.inlong.manager.dao.entity;
 
+import lombok.Data;
+
 import java.io.Serializable;
 import java.util.Date;
-import lombok.Data;
 
 @Data
 public class GroupHeartbeatEntity implements Serializable {
-    private Integer id;
 
+    private static final long serialVersionUID = 1L;
+    private Integer id;
     private String component;
-
     private String instance;
-
     private String inlongGroupId;
-
-    private Date reportTime;
-
-    private Date modifyTime;
-
+    private String statusHeartbeat;
+    private String metricHeartbeat;
+    private Long reportTime;
     private Date createTime;
+    private Date modifyTime;
 
-    private static final long serialVersionUID = 1L;
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java
deleted file mode 100644
index f747cde3f..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import java.io.Serializable;
-import lombok.Data;
-
-@Data
-public class GroupHeartbeatEntityWithBLOBs extends GroupHeartbeatEntity implements Serializable {
-
-    private String statusHeartbeat;
-
-    private String metricHeartbeat;
-
-    private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
index 8ee2255c9..6e74b885e 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
@@ -17,27 +17,24 @@
 
 package org.apache.inlong.manager.dao.entity;
 
+import lombok.Data;
+
 import java.io.Serializable;
 import java.util.Date;
-import lombok.Data;
 
 @Data
 public class StreamHeartbeatEntity implements Serializable {
-    private Integer id;
 
+    private static final long serialVersionUID = 1L;
+    private Integer id;
     private String component;
-
     private String instance;
-
     private String inlongGroupId;
-
     private String inlongStreamId;
-
-    private Date reportTime;
-
-    private Date modifyTime;
-
+    private String statusHeartbeat;
+    private String metricHeartbeat;
+    private Long reportTime;
     private Date createTime;
+    private Date modifyTime;
 
-    private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java
deleted file mode 100644
index 83ceb1633..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import java.io.Serializable;
-import lombok.Data;
-
-@Data
-public class StreamHeartbeatEntityWithBLOBs extends StreamHeartbeatEntity implements Serializable {
-    private String statusHeartbeat;
-
-    private String metricHeartbeat;
-
-    private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
index 2e653dc7e..8ebf1bceb 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
@@ -17,21 +17,26 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
-import java.util.List;
 import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
 
+@Repository
 public interface ComponentHeartbeatEntityMapper {
-    int deleteByPrimaryKey(Integer id);
 
-    int insert(ComponentHeartbeatEntityWithBLOBs record);
+    int insert(ComponentHeartbeatEntity record);
 
-    ComponentHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+    int insertOrUpdateByKey(ComponentHeartbeatEntity record);
 
-    ComponentHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
-            @Param("instance") String instance);
+    ComponentHeartbeatEntity selectByPrimaryKey(Integer id);
 
-    List<ComponentHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component);
+    ComponentHeartbeatEntity selectByKey(@Param("component") String component, @Param("instance") String instance);
+
+    List<ComponentHeartbeatEntity> selectByCondition(@Param("request") HeartbeatPageRequest request);
+
+    int deleteByPrimaryKey(Integer id);
 
-    int updateByKeyWithBLOBs(ComponentHeartbeatEntityWithBLOBs record);
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
index e37062e4e..cbe0a9ce9 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
@@ -17,23 +17,29 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
-import java.util.List;
 import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
 
+@Repository
 public interface GroupHeartbeatEntityMapper {
-    int deleteByPrimaryKey(Integer id);
 
-    int insert(GroupHeartbeatEntityWithBLOBs record);
+    int insert(GroupHeartbeatEntity record);
 
-    GroupHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+    int insertOrUpdateAll(@Param("component") String component, @Param("instance") String instance,
+            @Param("reportTime") Long reportTime, @Param("list") List<GroupHeartbeat> list);
 
-    GroupHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
-            @Param("instance") String instance,
+    GroupHeartbeatEntity selectByPrimaryKey(Integer id);
+
+    GroupHeartbeatEntity selectByKey(@Param("component") String component, @Param("instance") String instance,
             @Param("inlongGroupId") String inlongGroupId);
 
-    List<GroupHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component,
-            @Param("instance") String instance);
+    List<GroupHeartbeatEntity> selectByCondition(@Param("request") HeartbeatPageRequest request);
+
+    int deleteByPrimaryKey(Integer id);
 
-    int updateByKeyWithBLOBs(GroupHeartbeatEntityWithBLOBs record);
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java
index 2ab8293e5..1af26e3ed 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java
@@ -17,25 +17,29 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
-import java.util.List;
 import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
 
+@Repository
 public interface StreamHeartbeatEntityMapper {
-    int deleteByPrimaryKey(Integer id);
 
-    int insert(StreamHeartbeatEntityWithBLOBs record);
+    int insert(StreamHeartbeatEntity record);
 
-    StreamHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+    int insertOrUpdateAll(@Param("component") String component, @Param("instance") String instance,
+            @Param("reportTime") Long reportTime, @Param("list") List<StreamHeartbeat> list);
 
-    StreamHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
-            @Param("instance") String instance,
-            @Param("inlongGroupId") String inlongGroupId,
-            @Param("inlongStreamId") String inlongStreamId);
+    StreamHeartbeatEntity selectByPrimaryKey(Integer id);
 
-    List<StreamHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component,
-            @Param("instance") String instance,
-            @Param("inlongGroupId") String inlongGroupId);
+    StreamHeartbeatEntity selectByKey(@Param("component") String component, @Param("instance") String instance,
+            @Param("inlongGroupId") String inlongGroupId, @Param("inlongStreamId") String inlongStreamId);
+
+    List<StreamHeartbeatEntity> selectByCondition(@Param("request") HeartbeatPageRequest request);
+
+    int deleteByPrimaryKey(Integer id);
 
-    int updateByKeyWithBLOBs(StreamHeartbeatEntityWithBLOBs record);
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
index 508210e3d..abae727b1 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
@@ -1,83 +1,85 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
     Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
+    or more contributor license agreements. See the NOTICE file
     distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
+    regarding copyright ownership. The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
+    with the License. You may obtain a copy of the License at
 
       http://www.apache.org/licenses/LICENSE-2.0
 
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
+    KIND, either express or implied. See the License for the
     specific language governing permissions and limitations
     under the License.
 -->
 
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper">
-  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
-    <id column="id" jdbcType="INTEGER" property="id" />
-    <result column="component" jdbcType="VARCHAR" property="component" />
-    <result column="instance" jdbcType="VARCHAR" property="instance" />
-    <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
-    <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
-    <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
-  </resultMap>
-  <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
-    <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
-    <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
-  </resultMap>
-  <sql id="Base_Column_List">
-    id, component, instance, report_time, modify_time, create_time
-  </sql>
-  <sql id="Blob_Column_List">
-    status_heartbeat, metric_heartbeat
-  </sql>
-  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
-    select
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from component_heartbeat
-    where id = #{id,jdbcType=INTEGER}
-  </select>
-  <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
-    select
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from component_heartbeat
-    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
-  </select>
-  <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
-    select
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from component_heartbeat
-    where component = #{component,jdbcType=VARCHAR}
-  </select>
-  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
-    delete from component_heartbeat
-    where id = #{id,jdbcType=INTEGER}
-  </delete>
-  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
-    insert into component_heartbeat (component, instance,
-      report_time, status_heartbeat, metric_heartbeat)
-    values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
-      #{reportTime,jdbcType=TIMESTAMP}, #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR})
-  </insert>
-  <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
-    update component_heartbeat
-    set
-      report_time = #{reportTime,jdbcType=TIMESTAMP},
-      status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
-      metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
-    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
-  </update>
+    <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="component" jdbcType="VARCHAR" property="component"/>
+        <result column="instance" jdbcType="VARCHAR" property="instance"/>
+        <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat"/>
+        <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat"/>
+        <result column="report_time" jdbcType="BIGINT" property="reportTime"/>
+        <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+    </resultMap>
+    <sql id="Base_Column_List">
+        id, component, instance, status_heartbeat, metric_heartbeat, report_time, create_time, modify_time
+    </sql>
+
+    <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
+        insert into component_heartbeat (component, instance,
+                                         status_heartbeat, metric_heartbeat,
+                                         report_time)
+        values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+                #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR},
+                #{reportTime,jdbcType=BIGINT})
+    </insert>
+    <insert id="insertOrUpdateByKey" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
+        insert into component_heartbeat (component, instance,
+                                         status_heartbeat, metric_heartbeat,
+                                         report_time)
+        values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+                #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR},
+                #{reportTime,jdbcType=BIGINT})
+        ON DUPLICATE KEY UPDATE status_heartbeat = values(status_heartbeat),
+                                metric_heartbeat = values(metric_heartbeat),
+                                report_time      = values(report_time)
+    </insert>
+
+    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from component_heartbeat
+        where id = #{id,jdbcType=INTEGER}
+    </select>
+    <select id="selectByKey" parameterType="java.lang.String" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from component_heartbeat
+        where component = #{component,jdbcType=VARCHAR}
+        and instance = #{instance,jdbcType=VARCHAR}
+    </select>
+    <select id="selectByCondition"
+            parameterType="org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest"
+            resultType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from component_heartbeat
+        where component = #{request.component, jdbcType=VARCHAR}
+        order by modify_time desc
+    </select>
+
+    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+        delete
+        from component_heartbeat
+        where id = #{id,jdbcType=INTEGER}
+    </delete>
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml
index bbba5e324..fa258eb47 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml
@@ -1,86 +1,100 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
     Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
+    or more contributor license agreements. See the NOTICE file
     distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
+    regarding copyright ownership. The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
+    with the License. You may obtain a copy of the License at
 
       http://www.apache.org/licenses/LICENSE-2.0
 
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
+    KIND, either express or implied. See the License for the
     specific language governing permissions and limitations
     under the License.
 -->
 
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper">
-  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity">
-    <id column="id" jdbcType="INTEGER" property="id" />
-    <result column="component" jdbcType="VARCHAR" property="component" />
-    <result column="instance" jdbcType="VARCHAR" property="instance" />
-    <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
-    <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
-    <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
-    <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
-  </resultMap>
-  <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
-    <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
-    <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
-  </resultMap>
-  <sql id="Base_Column_List">
-    id, component, instance, inlong_group_id, report_time, modify_time, create_time
-  </sql>
-  <sql id="Blob_Column_List">
-    status_heartbeat, metric_heartbeat
-  </sql>
-  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
-    select 
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from group_heartbeat
-    where id = #{id,jdbcType=INTEGER}
-  </select>
-  <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
-    select
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from group_heartbeat
-    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
-  </select>
-  <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
-    select
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from group_heartbeat
-    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
-  </select>
-  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
-    delete from group_heartbeat
-    where id = #{id,jdbcType=INTEGER}
-  </delete>
-  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
-    insert into group_heartbeat (component, instance,
-      inlong_group_id, report_time, status_heartbeat, metric_heartbeat
-      )
-    values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
-      #{inlongGroupId,jdbcType=VARCHAR}, #{reportTime,jdbcType=TIMESTAMP}, #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR}
-      )
-  </insert>
-  <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
-    update group_heartbeat
-    set
-      report_time = #{reportTime,jdbcType=TIMESTAMP},
-      status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
-      metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
-    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
-  </update>
+    <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="component" jdbcType="VARCHAR" property="component"/>
+        <result column="instance" jdbcType="VARCHAR" property="instance"/>
+        <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+        <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat"/>
+        <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat"/>
+        <result column="report_time" jdbcType="BIGINT" property="reportTime"/>
+        <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+    </resultMap>
+    <sql id="Base_Column_List">
+        id, component, instance, inlong_group_id, status_heartbeat, metric_heartbeat,
+        report_time, create_time, modify_time
+    </sql>
+
+    <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity">
+        insert into group_heartbeat (component, instance,
+                                     inlong_group_id, status_heartbeat,
+                                     metric_heartbeat, report_time)
+        values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+                #{inlongGroupId,jdbcType=VARCHAR}, #{statusHeartbeat,jdbcType=LONGVARCHAR},
+                #{metricHeartbeat,jdbcType=LONGVARCHAR}, #{reportTime,jdbcType=BIGINT})
+    </insert>
+    <insert id="insertOrUpdateAll" parameterType="java.util.List">
+        insert into group_heartbeat (component, instance,
+        inlong_group_id, status_heartbeat,
+        metric_heartbeat, report_time)
+        values
+        <foreach collection="list" index="index" item="item" open="" close="" separator=",">
+            (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+            #{item.inlongGroupId,jdbcType=VARCHAR}, #{item.statusHeartbeat,jdbcType=LONGVARCHAR},
+            #{item.metricHeartbeat,jdbcType=LONGVARCHAR}, #{reportTime,jdbcType=BIGINT})
+        </foreach>
+        ON DUPLICATE KEY UPDATE
+        status_heartbeat = values(status_heartbeat),
+        metric_heartbeat = values(metric_heartbeat),
+        report_time = values(report_time)
+    </insert>
+
+    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from group_heartbeat
+        where id = #{id,jdbcType=INTEGER}
+    </select>
+    <select id="selectByKey" parameterType="java.lang.String" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from group_heartbeat
+        where component = #{component,jdbcType=VARCHAR}
+        and instance = #{instance,jdbcType=VARCHAR}
+        and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+    </select>
+    <select id="selectByCondition"
+            parameterType="org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest"
+            resultType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from group_heartbeat
+        <where>
+            component = #{request.component, jdbcType=VARCHAR}
+            <if test="request.inlongGroupId != null and request.inlongGroupId != ''">
+                and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
+            </if>
+            <if test="request.instance != null and request.instance != ''">
+                and instance = #{request.instance, jdbcType=VARCHAR}
+            </if>
+            order by modify_time desc
+        </where>
+    </select>
+
+    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+        delete
+        from group_heartbeat
+        where id = #{id,jdbcType=INTEGER}
+    </delete>
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml
index c3762875e..a22bfdcaf 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml
@@ -1,87 +1,107 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
     Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
+    or more contributor license agreements. See the NOTICE file
     distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
+    regarding copyright ownership. The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
+    with the License. You may obtain a copy of the License at
 
       http://www.apache.org/licenses/LICENSE-2.0
 
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
+    KIND, either express or implied. See the License for the
     specific language governing permissions and limitations
     under the License.
 -->
 
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper">
-  <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
-    <id column="id" jdbcType="INTEGER" property="id" />
-    <result column="component" jdbcType="VARCHAR" property="component" />
-    <result column="instance" jdbcType="VARCHAR" property="instance" />
-    <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
-    <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
-    <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
-    <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
-    <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
-  </resultMap>
-  <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
-    <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
-    <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
-  </resultMap>
-  <sql id="Base_Column_List">
-    id, component, instance, inlong_group_id, inlong_stream_id, report_time, modify_time, 
-    create_time
-  </sql>
-  <sql id="Blob_Column_List">
-    status_heartbeat, metric_heartbeat
-  </sql>
-  <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
-    select 
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from stream_heartbeat
-    where id = #{id,jdbcType=INTEGER}
-  </select>
-  <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
-    select
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from stream_heartbeat
-    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR} and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
-  </select>
-  <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
-    select
-    <include refid="Base_Column_List" />
-    ,
-    <include refid="Blob_Column_List" />
-    from stream_heartbeat
-    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
-  </select>
-  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
-    delete from stream_heartbeat
-    where id = #{id,jdbcType=INTEGER}
-  </delete>
-  <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
-    insert into stream_heartbeat (component, instance,
-      inlong_group_id, inlong_stream_id, report_time, status_heartbeat, metric_heartbeat)
-    values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
-      #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, #{reportTime,jdbcType=TIMESTAMP}, 
-      #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR})
-  </insert>
-  <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
-    update stream_heartbeat
-    set
-      report_time = #{reportTime,jdbcType=TIMESTAMP},
-      status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
-      metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
-    where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR} and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
-  </update>
+    <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="component" jdbcType="VARCHAR" property="component"/>
+        <result column="instance" jdbcType="VARCHAR" property="instance"/>
+        <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+        <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
+        <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat"/>
+        <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat"/>
+        <result column="report_time" jdbcType="BIGINT" property="reportTime"/>
+        <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+    </resultMap>
+    <sql id="Base_Column_List">
+        id, component, instance, inlong_group_id, inlong_stream_id, status_heartbeat, metric_heartbeat,
+        report_time, create_time, modify_time
+    </sql>
+
+    <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
+        insert into stream_heartbeat (component, instance,
+                                      inlong_group_id, inlong_stream_id,
+                                      status_heartbeat, metric_heartbeat,
+                                      report_time, create_time)
+        values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+                #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+                #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR},
+                #{reportTime,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP})
+    </insert>
+    <insert id="insertOrUpdateAll" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
+        insert into stream_heartbeat (component, instance,
+        inlong_group_id, inlong_stream_id,
+        status_heartbeat, metric_heartbeat,
+        report_time)
+        values
+        <foreach collection="list" index="index" item="item" open="" close="" separator=",">
+            (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+            #{item.inlongGroupId,jdbcType=VARCHAR}, #{item.inlongStreamId,jdbcType=VARCHAR},
+            #{item.statusHeartbeat,jdbcType=LONGVARCHAR}, #{item.metricHeartbeat,jdbcType=LONGVARCHAR},
+            #{reportTime,jdbcType=BIGINT})
+        </foreach>
+        ON DUPLICATE KEY UPDATE
+        status_heartbeat = values(status_heartbeat),
+        metric_heartbeat = values(metric_heartbeat),
+        report_time = values(report_time)
+    </insert>
+
+    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_heartbeat
+        where id = #{id,jdbcType=INTEGER}
+    </select>
+    <select id="selectByKey" parameterType="java.lang.String" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_heartbeat
+        where component = #{component,jdbcType=VARCHAR}
+        and instance = #{instance,jdbcType=VARCHAR}
+        and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+        and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
+    </select>
+    <select id="selectByCondition"
+            parameterType="org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest"
+            resultType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_heartbeat
+        <where>
+            component = #{request.component, jdbcType=VARCHAR}
+            and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
+            <if test="request.instance != null and request.instance != ''">
+                and instance = #{request.instance, jdbcType=VARCHAR}
+            </if>
+            <if test="request.inlongStreamId != null and request.inlongStreamId != ''">
+                and inlong_stream_id = #{request.inlongStreamId, jdbcType=VARCHAR}
+            </if>
+            order by modify_time desc
+        </where>
+    </select>
+
+    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+        delete
+        from stream_heartbeat
+        where id = #{id,jdbcType=INTEGER}
+    </delete>
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java
index ea8acf4aa..4dfe64752 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java
@@ -20,7 +20,9 @@ package org.apache.inlong.manager.service.core;
 import com.github.pagehelper.PageInfo;
 import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
 import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
 import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatQueryRequest;
 import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
 
 /**
@@ -29,73 +31,59 @@ import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
 public interface HeartbeatService {
 
     /**
-     * report Heartbeat
+     * Report heartbeat for inlong component, such as Agent, Sort, etc.
+     *
      * @param request request
-     * @return
+     * @return saved success or not
      */
-    String reportHeartbeatInfo(HeartbeatReportRequest request);
+    Boolean reportHeartbeat(HeartbeatReportRequest request);
 
     /**
-     * get Component HeartbeatInfo
-     * @param component component
-     * @param instance instance
-     * @return ComponentHeartBeatResponse
+     * Get component heartbeat
+     *
+     * @param request query request of heartbeat
+     * @return component heartbeat
      */
-    ComponentHeartbeatResponse getComponentHeartbeatInfo(String component,
-            String instance);
+    ComponentHeartbeatResponse getComponentHeartbeat(HeartbeatQueryRequest request);
 
     /**
-     * get Group HeartbeatInfo
-     * @param component component
-     * @param instance instance
-     * @param inlongGroupId inlongGroupId
-     * @return  GroupHeartbeatResponse
+     * Get inlong group heartbeat
+     *
+     * @param request query request of heartbeat
+     * @return group heartbeat
      */
-    GroupHeartbeatResponse getGroupHeartbeatInfo(String component,
-            String instance, String inlongGroupId);
+    GroupHeartbeatResponse getGroupHeartbeat(HeartbeatQueryRequest request);
 
     /**
-     * get Stream HeartbeatInfo
-     * @param component component
-     * @param instance instance
-     * @param inlongGroupId inlongGroupId
-     * @param inlongStreamId inlongStreamId
-     * @return StreamHeartBeatResponse
+     * Get inlong stream heartbeat
+     *
+     * @param request query request of heartbeat
+     * @return stream heartbeat
      */
-    StreamHeartbeatResponse getStreamHeartbeatInfo(String component,
-            String instance, String inlongGroupId, String inlongStreamId);
+    StreamHeartbeatResponse getStreamHeartbeat(HeartbeatQueryRequest request);
 
     /**
-     * get Component HeartbeatInfos by page
-     * @param component  component
-     * @param pageNum pageNum
-     * @param pageSize pageSize
-     * @return one page of ComponentHeartBeatResponse
+     * List component heartbeat by page
+     *
+     * @param request paging query request
+     * @return list of component heartbeat
      */
-    PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfos(String component, int pageNum,
-            int pageSize);
+    PageInfo<ComponentHeartbeatResponse> listComponentHeartbeat(HeartbeatPageRequest request);
 
     /**
-     * get Group HeartbeatInfos by page
-     * @param component component
-     * @param instance instance
-     * @param pageNum pageNum
-     * @param pageSize pageSize
-     * @return one page of GroupHeartbeatResponse
+     * List group heartbeat by page
+     *
+     * @param request paging query request
+     * @return list of group heartbeat
      */
-    PageInfo<GroupHeartbeatResponse> getGroupHeartbeatInfos(String component,
-            String instance, int pageNum, int pageSize);
+    PageInfo<GroupHeartbeatResponse> listGroupHeartbeat(HeartbeatPageRequest request);
 
     /**
-     * get Stream HeartbeatInfos
-     * @param component component
-     * @param instance instance
-     * @param inlongGroupId inlongGroupId
-     * @param pageNum pageNum
-     * @param pageSize pageSize
-     * @return one page of GroupHeartbeatResponse
+     * List stream heartbeat by page
+     *
+     * @param request paging query request
+     * @return list of stream heartbeat
      */
-    PageInfo<StreamHeartbeatResponse> getStreamHeartbeatInfos(String component,
-            String instance, String inlongGroupId, int pageNum, int pageSize);
+    PageInfo<StreamHeartbeatResponse> listStreamHeartbeat(HeartbeatPageRequest request);
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
index 3b46c771f..07b745615 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
@@ -21,23 +21,25 @@ import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import com.google.gson.Gson;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeat;
 import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
 import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
 import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatQueryRequest;
 import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
 import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
 import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
-import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
-import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity;
 import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
 import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
@@ -47,454 +49,238 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+
 /**
- *  report or query heartbeat info
+ * Heartbeat service layer implementation
  */
 @Service
-public class HeartbeatServiceImpl
-        implements HeartbeatService {
+public class HeartbeatServiceImpl implements HeartbeatService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatServiceImpl.class);
+    private static final Gson GSON = new Gson();
 
     @Autowired
-    private ComponentHeartbeatEntityMapper componentHeartbeatEntityMapper;
-
+    private ComponentHeartbeatEntityMapper componentHeartbeatMapper;
     @Autowired
-    private GroupHeartbeatEntityMapper groupHeartbeatEntityMapper;
-
+    private GroupHeartbeatEntityMapper groupHeartbeatMapper;
     @Autowired
-    private StreamHeartbeatEntityMapper streamHeartbeatEntityMapper;
+    private StreamHeartbeatEntityMapper streamHeartbeatMapper;
 
-    private static Gson gson = new Gson();
-
-    /**
-     * heartbeat common handler
-     * @param request request
-     * @return
-     */
     @Override
-    public String reportHeartbeatInfo(HeartbeatReportRequest request) {
-        if (request != null && StringUtils.isNotEmpty(request.getComponent())) {
-            ComponentTypeEnum componentType =
-                    ComponentTypeEnum.valueOf(request.getComponent());
-            switch (componentType) {
-                case Sort:
-                case DataProxy:
-                case Agent:
-                case Cache:
-                default:
-                    updateByDefaultWay(request);
-            }
-        } else {
-            LOGGER.warn("request is null or component [{}] is not supported",
-                    request.getComponent());
+    public Boolean reportHeartbeat(HeartbeatReportRequest request) {
+        if (request == null || StringUtils.isBlank(request.getComponent())) {
+            LOGGER.warn("request is null or component null, just return");
+            return false;
         }
-        return "Success";
-    }
-
-    /**
-     * update Heartbeat Data
-     * @param request  request
-     */
-    private void updateComponentHeartbeatData(HeartbeatReportRequest request) {
-        if (request == null || request.getComponentHeartbeat() == null) {
-            return;
-        }
-        ComponentHeartbeatEntityWithBLOBs entity = new ComponentHeartbeatEntityWithBLOBs();
-        entity.setComponent(request.getComponent());
-        entity.setInstance(request.getInstance());
-        entity.setReportTime(new Date(request.getReportTimestamp()));
-        if (StringUtils.isNotEmpty(request.getComponentHeartbeat().getStatusHeartbeat())) {
-            entity.setStatusHeartbeat(request.getComponentHeartbeat().getStatusHeartbeat());
-        }
-        if (StringUtils.isNotEmpty(request.getComponentHeartbeat().getMetricHeartbeat())) {
-            entity.setMetricHeartbeat(request.getComponentHeartbeat().getMetricHeartbeat());
-        }
-        int count = componentHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
-        if (count == 0) {
-            componentHeartbeatEntityMapper.insert(entity);
-        }
-    }
-
-    /**
-     * update Component StaticData
-     * @param request request
-     */
-    private void updateGroupHeartbeatData(HeartbeatReportRequest request) {
-        List<GroupHeartbeat> list = request.getGroupHeartbeats();
-        if (list != null) {
-            for (GroupHeartbeat info : list) {
-                GroupHeartbeatEntityWithBLOBs entity = new GroupHeartbeatEntityWithBLOBs();
-                entity.setComponent(request.getComponent());
-                entity.setInstance(request.getInstance());
-                entity.setReportTime(new Date(request.getReportTimestamp()));
-                entity.setInlongGroupId(info.getInlongGroupId());
-                entity.setMetricHeartbeat(info.getMetricHeartbeat());
-                entity.setStatusHeartbeat(info.getStatusHeartbeat());
-                int count = groupHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
-                if (count == 0) {
-                    groupHeartbeatEntityMapper.insert(entity);
-                }
-            }
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("received heartbeat: " + request);
         }
-    }
 
-    /**
-     * update Stream Status Data
-     * @param request request
-     */
-    private void updateStreamHeartBeatData(HeartbeatReportRequest request) {
-        List<StreamHeartbeat> list = request.getStreamHeartbeats();
-        if (list != null) {
-            for (StreamHeartbeat info : list) {
-                StreamHeartbeatEntityWithBLOBs entity = new StreamHeartbeatEntityWithBLOBs();
-                entity.setComponent(request.getComponent());
-                entity.setInstance(request.getInstance());
-                entity.setReportTime(new Date(request.getReportTimestamp()));
-                entity.setInlongGroupId(info.getInlongGroupId());
-                entity.setInlongStreamId(info.getInlongStreamId());
-                entity.setMetricHeartbeat(info.getMetricHeartbeat());
-                entity.setStatusHeartbeat(info.getStatusHeartbeat());
-                int count = streamHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
-                if (count == 0) {
-                    streamHeartbeatEntityMapper.insert(entity);
-                }
-            }
+        ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(request.getComponent());
+        switch (componentType) {
+            case Sort:
+            case DataProxy:
+            case Agent:
+            case Cache:
+                return updateHeartbeatOpt(request);
+            default:
+                throw new BusinessException("Unsupported component type for " + request.getComponent());
         }
     }
 
-    /**
-     * get heartbeat info
-     * @param component componentTypeName
-     * @param instance instanceIdentifier
-     * @return heartbeat info
-     */
     @Override
-    public ComponentHeartbeatResponse getComponentHeartbeatInfo(String component,
-            String instance) {
-        if (component != null && StringUtils.isNotEmpty(component)) {
-            ComponentTypeEnum componentType =
-                    ComponentTypeEnum.valueOf(component);
-            switch (componentType) {
-                case Sort:
-                case DataProxy:
-                case Agent:
-                case Cache:
-                default:
-                    return getComponentHeartbeatInfoByDefaultWay(component, instance);
-            }
-        } else {
-            LOGGER.warn("request is null or component type is null");
+    public ComponentHeartbeatResponse getComponentHeartbeat(HeartbeatQueryRequest request) {
+        Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+        String component = request.getComponent();
+        Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+        Preconditions.checkNotEmpty(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
+
+        ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+        switch (componentType) {
+            case Sort:
+            case DataProxy:
+            case Agent:
+            case Cache:
+                ComponentHeartbeatEntity res = componentHeartbeatMapper.selectByKey(component, request.getInstance());
+                return CommonBeanUtils.copyProperties(res, ComponentHeartbeatResponse::new);
+            default:
+                throw new BusinessException("Unsupported component type for " + component);
         }
-        return null;
     }
 
-    /**
-     * get heartbeat static info
-     * @param component componentTypeName
-     * @param instance instanceIdentifier
-     * @param inlongGroupId inlongGroupId
-     * @return heartbeatStaticInfoResponse
-     */
     @Override
-    public GroupHeartbeatResponse getGroupHeartbeatInfo(String component,
-            String instance, String inlongGroupId) {
-        if (component != null && StringUtils.isNotEmpty(component)) {
-            ComponentTypeEnum componentType =
-                    ComponentTypeEnum.valueOf(component);
-            switch (componentType) {
-                case Sort:
-                case DataProxy:
-                case Agent:
-                case Cache:
-                default:
-                    return getGroupHeartbeatByDefaultWay(component, instance, inlongGroupId);
-            }
-        } else {
-            LOGGER.warn("request is null or component type is null");
+    public GroupHeartbeatResponse getGroupHeartbeat(HeartbeatQueryRequest request) {
+        Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+        String component = request.getComponent();
+        Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+        Preconditions.checkNotEmpty(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
+        Preconditions.checkNotEmpty(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+
+        ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+        switch (componentType) {
+            case Sort:
+            case DataProxy:
+            case Agent:
+            case Cache:
+                GroupHeartbeatEntity result = groupHeartbeatMapper.selectByKey(component, request.getInstance(),
+                        request.getInlongGroupId());
+                return CommonBeanUtils.copyProperties(result, GroupHeartbeatResponse::new);
+            default:
+                throw new BusinessException("Unsupported component type for " + component);
         }
-        return null;
     }
 
-    /**
-     * get heartbeat status info
-     * @param component componentTypeName
-     * @param instance instanceIdentifier
-     * @param inlongGroupId inlongGroupId
-     * @param inlongStreamId inlongStreamId
-     * @return heartbeatStatusInfoResponse
-     */
     @Override
-    public StreamHeartbeatResponse getStreamHeartbeatInfo(String component,
-            String instance, String inlongGroupId, String inlongStreamId) {
-        if (component != null && StringUtils.isNotEmpty(component)) {
-            ComponentTypeEnum componentType =
-                    ComponentTypeEnum.valueOf(component);
-            switch (componentType) {
-                case Sort:
-                case DataProxy:
-                case Agent:
-                case Cache:
-                default:
-                    return getStreamHeartbeatByDefaultWay(component, instance,
-                            inlongGroupId, inlongStreamId);
-            }
-        } else {
-            LOGGER.warn("request is null or component type is null");
+    public StreamHeartbeatResponse getStreamHeartbeat(HeartbeatQueryRequest request) {
+        Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+        String component = request.getComponent();
+        Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+        Preconditions.checkNotEmpty(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
+        Preconditions.checkNotEmpty(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+        Preconditions.checkNotEmpty(request.getInlongStreamId(), ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+
+        ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+        switch (componentType) {
+            case Sort:
+            case DataProxy:
+            case Agent:
+            case Cache:
+                StreamHeartbeatEntity result = streamHeartbeatMapper.selectByKey(component, request.getInstance(),
+                        request.getInlongGroupId(), request.getInlongStreamId());
+                return CommonBeanUtils.copyProperties(result, StreamHeartbeatResponse::new);
+            default:
+                throw new BusinessException("Unsupported component type for " + component);
         }
-        return null;
     }
 
-    /**
-     * get component heartbeat infos
-     * @param component  component
-     * @param pageNum pageNum
-     * @param pageSize pageSize
-     * @return pageInfos
-     */
     @Override
-    public PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfos(String component,
-            int pageNum, int pageSize) {
-        if (component != null && StringUtils.isNotEmpty(component)) {
-            ComponentTypeEnum componentType =
-                    ComponentTypeEnum.valueOf(component);
-            switch (componentType) {
-                case Sort:
-                case DataProxy:
-                case Agent:
-                case Cache:
-                default:
-                    return getComponentHeartbeatInfosByDefaultWay(component, pageNum, pageSize);
-            }
-        } else {
-            LOGGER.warn("request is null or component type is null");
+    public PageInfo<ComponentHeartbeatResponse> listComponentHeartbeat(HeartbeatPageRequest request) {
+        Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+        String component = request.getComponent();
+        Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+
+        ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+        switch (componentType) {
+            case Sort:
+            case DataProxy:
+            case Agent:
+            case Cache:
+                return listComponentHeartbeatOpt(request);
+            default:
+                throw new BusinessException("Unsupported component type for " + component);
         }
-        return null;
     }
 
-    /**
-     * get group heartbeat infos
-     * @param component component
-     * @param instance instance
-     * @param pageNum pageNum
-     * @param pageSize pageSize
-     * @return pageInfo
-     */
     @Override
-    public PageInfo<GroupHeartbeatResponse> getGroupHeartbeatInfos(String component,
-            String instance, int pageNum, int pageSize) {
-        if (component != null && StringUtils.isNotEmpty(component)) {
-            ComponentTypeEnum componentType =
-                    ComponentTypeEnum.valueOf(component);
-            switch (componentType) {
-                case Sort:
-                case DataProxy:
-                case Agent:
-                case Cache:
-                default:
-                    return getGroupHeartbeatsByDefaultWay(component, instance,
-                            pageNum, pageSize);
-            }
-        } else {
-            LOGGER.warn("request is null or component type is null");
+    public PageInfo<GroupHeartbeatResponse> listGroupHeartbeat(HeartbeatPageRequest request) {
+        Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+        String component = request.getComponent();
+        Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+
+        ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+        switch (componentType) {
+            case Sort:
+            case DataProxy:
+            case Agent:
+            case Cache:
+                return listGroupHeartbeatOpt(request);
+            default:
+                throw new BusinessException("Unsupported component type for " + component);
         }
-        return null;
     }
 
-    /**
-     * get stream heartbeat infos
-     * @param component component
-     * @param instance instance
-     * @param inlongGroupId inlongGroupId
-     * @param pageNum pageNum
-     * @param pageSize pageSize
-     * @return pageInfo
-     */
     @Override
-    public PageInfo<StreamHeartbeatResponse> getStreamHeartbeatInfos(String component,
-            String instance, String inlongGroupId, int pageNum, int pageSize) {
-        if (component != null && StringUtils.isNotEmpty(component)) {
-            ComponentTypeEnum componentType =
-                    ComponentTypeEnum.valueOf(component);
-            switch (componentType) {
-                case Sort:
-                case DataProxy:
-                case Agent:
-                case Cache:
-                default:
-                    return getStreamHeartbeatsByDefaultWay(component, instance,
-                            inlongGroupId, pageNum, pageSize);
-            }
-        } else {
-            LOGGER.warn("request is null or component type is null");
+    public PageInfo<StreamHeartbeatResponse> listStreamHeartbeat(HeartbeatPageRequest request) {
+        Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+        String component = request.getComponent();
+        Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+        Preconditions.checkNotEmpty(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+
+        ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+        switch (componentType) {
+            case Sort:
+            case DataProxy:
+            case Agent:
+            case Cache:
+                return listStreamHeartbeatOpt(request);
+            default:
+                throw new BusinessException("Unsupported component type for " + component);
         }
-        return null;
     }
 
     /**
-     * update By DefaultWay
-     * @param request request
+     * Default implementation for updating heartbeat
      */
-    private void updateByDefaultWay(HeartbeatReportRequest request) {
+    private Boolean updateHeartbeatOpt(HeartbeatReportRequest request) {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("InlongHeartbeatReportRequest json = {}", gson.toJson(request));
+            LOGGER.debug("heartbeat request json = {}", GSON.toJson(request));
         }
-        updateComponentHeartbeatData(request);
-        updateGroupHeartbeatData(request);
-        updateStreamHeartBeatData(request);
-    }
+        String component = request.getComponent();
+        String instance = request.getInstance();
+        Long reportTime = request.getReportTime();
 
-    /**
-     * get Heartbeat InfoByDefaultWay
-     * @param component component
-     * @param instance instance
-     * @return HeartbeatInfoResponse
-     */
-    private ComponentHeartbeatResponse getComponentHeartbeatInfoByDefaultWay(String component,
-            String instance) {
-        ComponentHeartbeatEntityWithBLOBs result =
-                componentHeartbeatEntityMapper.selectByKey(component, instance);
-        ComponentHeartbeatResponse componentHeartBeatResponse = null;
-        if (result != null) {
-            componentHeartBeatResponse = new ComponentHeartbeatResponse();
-            componentHeartBeatResponse.setComponent(result.getComponent());
-            componentHeartBeatResponse.setInstance(result.getInstance());
-            componentHeartBeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
-            componentHeartBeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
-            componentHeartBeatResponse.setReportTime(result.getReportTime().getTime());
+        // Add component heartbeats
+        ComponentHeartbeat componentHeartbeat = request.getComponentHeartbeat();
+        if (componentHeartbeat != null) {
+            ComponentHeartbeatEntity entity = new ComponentHeartbeatEntity();
+            entity.setComponent(component);
+            entity.setInstance(instance);
+            entity.setReportTime(reportTime);
+            entity.setStatusHeartbeat(componentHeartbeat.getStatusHeartbeat());
+            entity.setMetricHeartbeat(componentHeartbeat.getMetricHeartbeat());
+            componentHeartbeatMapper.insertOrUpdateByKey(entity);
         }
-        return componentHeartBeatResponse;
-    }
 
-    /**
-     * get heartbeat StaticInfo ByDefaultWay
-     * @param component component
-     * @param instance instance
-     * @param inlongGroupId inlongGroupId
-     * @return heartbeatStaticInfoResponse
-     */
-    private GroupHeartbeatResponse getGroupHeartbeatByDefaultWay(String component,
-            String instance, String inlongGroupId) {
-
-        GroupHeartbeatEntityWithBLOBs result =
-                groupHeartbeatEntityMapper.selectByKey(component,
-                        instance, inlongGroupId);
-        GroupHeartbeatResponse groupHeartbeatResponse = null;
-        if (result != null) {
-            groupHeartbeatResponse = new GroupHeartbeatResponse();
-            groupHeartbeatResponse.setInlongGroupId(inlongGroupId);
-            groupHeartbeatResponse.setComponent(result.getComponent());
-            groupHeartbeatResponse.setInstance(result.getInstance());
-            groupHeartbeatResponse.setReportTime(result.getReportTime().getTime());
-            groupHeartbeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
-            groupHeartbeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
+        // Add group heartbeats
+        List<GroupHeartbeat> groupHeartbeats = request.getGroupHeartbeats();
+        if (CollectionUtils.isNotEmpty(groupHeartbeats)) {
+            groupHeartbeatMapper.insertOrUpdateAll(component, instance, reportTime, groupHeartbeats);
         }
-        return groupHeartbeatResponse;
-    }
 
-    /**
-     * get Heartbeat StatusInfo ByDefaultWay
-     * @param componentTypeName componentTypeName
-     * @param instanceIdentifier instanceIdentifier
-     * @param inlongGroupId inlongGroupId
-     * @return heartbeatStatusInfoResponse
-     */
-    private StreamHeartbeatResponse getStreamHeartbeatByDefaultWay(String componentTypeName,
-            String instanceIdentifier, String inlongGroupId, String inlongStreamId) {
-        StreamHeartbeatEntityWithBLOBs result =
-                streamHeartbeatEntityMapper.selectByKey(componentTypeName,
-                        instanceIdentifier, inlongGroupId, inlongStreamId);
-        StreamHeartbeatResponse streamHeartBeatResponse = null;
-        if (result != null) {
-            streamHeartBeatResponse = new StreamHeartbeatResponse();
-            streamHeartBeatResponse.setComponent(result.getComponent());
-            streamHeartBeatResponse.setInstance(result.getInstance());
-            streamHeartBeatResponse.setReportTime(result.getReportTime().getTime());
-            streamHeartBeatResponse.setInlongGroupId(result.getInlongGroupId());
-            streamHeartBeatResponse.setInlongStreamId(result.getInlongStreamId());
-            streamHeartBeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
-            streamHeartBeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
+        // Add stream heartbeats
+        List<StreamHeartbeat> streamHeartbeats = request.getStreamHeartbeats();
+        if (CollectionUtils.isNotEmpty(streamHeartbeats)) {
+            streamHeartbeatMapper.insertOrUpdateAll(component, instance, reportTime, streamHeartbeats);
         }
-        return streamHeartBeatResponse;
+
+        return true;
     }
 
-    /**
-     * get Heartbeat InfoByDefaultWay
-     * @param component component
-     * @param pageNum pageNum
-     * @param pageSize pageSize
-     * @return HeartbeatInfoResponse
-     */
-    private PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfosByDefaultWay(String component,
-            int pageNum, int pageSize) {
-        Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
-        PageHelper.startPage(pageNum, pageSize);
-
-        Page<ComponentHeartbeatEntityWithBLOBs> entityPage = (Page<ComponentHeartbeatEntityWithBLOBs>)
-                componentHeartbeatEntityMapper.selectHeartbeats(component);
-
-        List<ComponentHeartbeatResponse> componentHeartBeatResponses = new ArrayList<>();
-        PageInfo<ComponentHeartbeatResponse> pageInfo;
-        if (entityPage != null) {
-            componentHeartBeatResponses = CommonBeanUtils
-                    .copyListProperties(entityPage, ComponentHeartbeatResponse::new);
-        }
-        pageInfo = new PageInfo<>(componentHeartBeatResponses);
-        pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+    private PageInfo<ComponentHeartbeatResponse> listComponentHeartbeatOpt(HeartbeatPageRequest request) {
+        PageHelper.startPage(request.getPageNum(), request.getPageSize());
+        Page<ComponentHeartbeatEntity> entityPage = (Page<ComponentHeartbeatEntity>)
+                componentHeartbeatMapper.selectByCondition(request);
+        List<ComponentHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
+                ComponentHeartbeatResponse::new);
+
+        PageInfo<ComponentHeartbeatResponse> pageInfo = new PageInfo<>(responseList);
+        pageInfo.setTotal(responseList.size());
         return pageInfo;
     }
 
-    /**
-     * get heartbeat StaticInfo ByDefaultWay
-     * @param component component
-     * @param instance instance
-     * @param pageNum pageNum
-     * @param pageSize pageSize
-     * @return heartbeatStaticInfoResponse
-     */
-    private PageInfo<GroupHeartbeatResponse> getGroupHeartbeatsByDefaultWay(String component,
-            String instance, int pageNum, int pageSize) {
-        Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
-        Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
-        PageHelper.startPage(pageNum, pageSize);
-        Page<GroupHeartbeatEntityWithBLOBs> entityPage = (Page<GroupHeartbeatEntityWithBLOBs>)
-                groupHeartbeatEntityMapper.selectHeartbeats(component, instance);
-        List<GroupHeartbeatResponse>  groupHeartbeatResponses = new ArrayList<>();
-        PageInfo<GroupHeartbeatResponse> pageInfo;
-        if (entityPage != null) {
-            groupHeartbeatResponses = CommonBeanUtils
-                    .copyListProperties(entityPage, GroupHeartbeatResponse::new);
-        }
-        pageInfo = new PageInfo<>(groupHeartbeatResponses);
-        pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+    private PageInfo<GroupHeartbeatResponse> listGroupHeartbeatOpt(HeartbeatPageRequest request) {
+        PageHelper.startPage(request.getPageNum(), request.getPageSize());
+        Page<GroupHeartbeatEntity> entityPage = (Page<GroupHeartbeatEntity>) groupHeartbeatMapper.selectByCondition(
+                request);
+        List<GroupHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
+                GroupHeartbeatResponse::new);
+
+        PageInfo<GroupHeartbeatResponse> pageInfo = new PageInfo<>(responseList);
+        pageInfo.setTotal(responseList.size());
         return pageInfo;
     }
 
-    /**
-     * get Heartbeat StatusInfo ByDefaultWay
-     * @param component component
-     * @param instance instance
-     * @param inlongGroupId inlongGroupId
-     * @param pageNum pageNum
-     * @param pageSize pageSize
-     * @return heartbeatStatusInfoResponse
-     */
-    private PageInfo<StreamHeartbeatResponse> getStreamHeartbeatsByDefaultWay(String component,
-            String instance, String inlongGroupId, int pageNum, int pageSize) {
-        Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
-        Preconditions.checkNotNull(instance, ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
-        Preconditions.checkNotNull(inlongGroupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
-        PageHelper.startPage(pageNum, pageSize);
-        Page<StreamHeartbeatEntityWithBLOBs> entityPage = (Page<StreamHeartbeatEntityWithBLOBs>)
-                streamHeartbeatEntityMapper.selectHeartbeats(component, instance, inlongGroupId);
-        List<StreamHeartbeatResponse>  streamHeartBeatResponses = new ArrayList<>();
-        PageInfo<StreamHeartbeatResponse> pageInfo;
-        if (entityPage != null) {
-            streamHeartBeatResponses = CommonBeanUtils
-                    .copyListProperties(entityPage, StreamHeartbeatResponse::new);
-        }
-        pageInfo = new PageInfo<>(streamHeartBeatResponses);
-        pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+    private PageInfo<StreamHeartbeatResponse> listStreamHeartbeatOpt(HeartbeatPageRequest request) {
+        PageHelper.startPage(request.getPageNum(), request.getPageSize());
+        Page<StreamHeartbeatEntity> entityPage = (Page<StreamHeartbeatEntity>)
+                streamHeartbeatMapper.selectByCondition(request);
+        List<StreamHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
+                StreamHeartbeatResponse::new);
+
+        PageInfo<StreamHeartbeatResponse> pageInfo = new PageInfo<>(responseList);
+        pageInfo.setTotal(responseList.size());
         return pageInfo;
     }
+
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
new file mode 100644
index 000000000..06d0cc5d2
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.impl;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatQueryRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity;
+import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.BDDMockito.given;
+
+@RunWith(SpringRunner.class)
+public class HeartbeatServiceTest {
+
+    @InjectMocks
+    private HeartbeatService heartbeatService = new HeartbeatServiceImpl();
+    @Mock
+    private ComponentHeartbeatEntityMapper componentHeartbeatMapper;
+    @Mock
+    private GroupHeartbeatEntityMapper groupHeartbeatMapper;
+    @Mock
+    private StreamHeartbeatEntityMapper streamHeartbeatMapper;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.openMocks(this);
+        ComponentHeartbeatEntity componentHeartbeat = new ComponentHeartbeatEntity();
+        componentHeartbeat.setComponent("Sort");
+        componentHeartbeat.setInstance("127.0.0.1");
+        componentHeartbeat.setStatusHeartbeat("[{\"status\":\"running\"}]");
+        componentHeartbeat.setMetricHeartbeat("[{\"mem\":\"16gb\",\"cpu\":\"60%\"}]");
+        componentHeartbeat.setReportTime(System.currentTimeMillis());
+        Page<ComponentHeartbeatEntity> componentPage = new Page<>();
+        componentPage.add(componentHeartbeat);
+        componentPage.setTotal(1);
+        given(componentHeartbeatMapper.insert(new ComponentHeartbeatEntity())).willReturn(1);
+        given(componentHeartbeatMapper.selectByKey(Mockito.anyString(),
+                Mockito.anyString())).willReturn(componentHeartbeat);
+        given(componentHeartbeatMapper.selectByCondition(Mockito.any())).willReturn(componentPage);
+
+        GroupHeartbeatEntity groupHeartbeat = new GroupHeartbeatEntity();
+        groupHeartbeat.setComponent("Sort");
+        groupHeartbeat.setInstance("127.0.0.1");
+        groupHeartbeat.setStatusHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+                + ": \"10\"},\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+        groupHeartbeat.setReportTime(System.currentTimeMillis());
+        groupHeartbeat.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+                + ": \"10\"},"
+                + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+        Page<GroupHeartbeatEntity> groupPage = new Page<>();
+        groupPage.add(groupHeartbeat);
+        groupPage.setTotal(1);
+        given(groupHeartbeatMapper.selectByKey(Mockito.anyString(),
+                Mockito.anyString(), Mockito.anyString())).willReturn(groupHeartbeat);
+        given(groupHeartbeatMapper.selectByCondition(Mockito.any())).willReturn(groupPage);
+
+        StreamHeartbeatEntity streamHeartbeat = new StreamHeartbeatEntity();
+        streamHeartbeat.setComponent("Sort");
+        streamHeartbeat.setInstance("127.0.0.1");
+        streamHeartbeat.setInlongGroupId("group1");
+        streamHeartbeat.setInlongStreamId("test_test");
+        streamHeartbeat.setStatusHeartbeat("[{\"statue\":\"running\"}]");
+        streamHeartbeat.setMetricHeartbeat("[{\"outMsg\":\"1\",\"inMsg\":2}]");
+        streamHeartbeat.setReportTime(System.currentTimeMillis());
+
+        Page<StreamHeartbeatEntity> streamPage = new Page<>();
+        streamPage.add(streamHeartbeat);
+        streamPage.setTotal(1);
+
+        given(streamHeartbeatMapper.selectByKey(Mockito.anyString(),
+                Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+                .willReturn(streamHeartbeat);
+        given(streamHeartbeatMapper.selectByCondition(Mockito.any())).willReturn(streamPage);
+    }
+
+    @Test
+    public void testReportHeartbeat() {
+        HeartbeatReportRequest request = new HeartbeatReportRequest();
+        request.setComponent("Sort");
+        request.setInstance("127.0.0.1");
+        request.setReportTime(Instant.now().toEpochMilli());
+
+        ComponentHeartbeat componentHeartbeat = new ComponentHeartbeat();
+        componentHeartbeat.setMetricHeartbeat("{\"mem\":\"100\"}");
+        componentHeartbeat.setStatusHeartbeat("{\"runningTime\":\"10h.35m\","
+                + "\"status\":\"10h.35m\","
+                + "\"groupIds\":\"group1,group2\"}");
+
+        List<GroupHeartbeat> groupHeartbeats = new ArrayList<>();
+        GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
+        groupHeartbeat.setInlongGroupId("group1");
+        groupHeartbeat.setStatusHeartbeat("[{\"status\":\"running\",\"streamIds\":\"1,2,3,4\"}]");
+        request.setGroupHeartbeats(groupHeartbeats);
+
+        StreamHeartbeat streamHeartbeat = new StreamHeartbeat();
+        streamHeartbeat.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+                + ": \"10\"},\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+        streamHeartbeat.setStatusHeartbeat("{}");
+        streamHeartbeat.setInlongGroupId("group1");
+        streamHeartbeat.setInlongStreamId("stream1");
+        List<StreamHeartbeat> streamHeartbeats = new ArrayList<>();
+        streamHeartbeats.add(streamHeartbeat);
+        request.setStreamHeartbeats(streamHeartbeats);
+
+        Assert.assertTrue(heartbeatService.reportHeartbeat(request));
+    }
+
+    @Test
+    public void testGetComponentHeartbeat() {
+        HeartbeatQueryRequest request = new HeartbeatQueryRequest();
+        request.setComponent("Sort");
+        request.setInstance("127.0.0.1");
+        ComponentHeartbeatResponse response = heartbeatService.getComponentHeartbeat(request);
+        Assert.assertEquals("127.0.0.1", response.getInstance());
+    }
+
+    @Test
+    public void testGetGroupHeartbeat() {
+        HeartbeatQueryRequest request = new HeartbeatQueryRequest();
+        request.setComponent("Sort");
+        request.setInstance("127.0.0.1");
+        request.setInlongGroupId("group1");
+        GroupHeartbeatResponse response = heartbeatService.getGroupHeartbeat(request);
+        Assert.assertEquals("127.0.0.1", response.getInstance());
+    }
+
+    @Test
+    public void testGetStreamHeartbeat() {
+        HeartbeatQueryRequest request = new HeartbeatQueryRequest();
+        request.setComponent("Sort");
+        request.setInstance("127.0.0.1");
+        request.setInlongGroupId("group1");
+        request.setInlongStreamId("stream1");
+
+        StreamHeartbeatResponse response = heartbeatService.getStreamHeartbeat(request);
+        Assert.assertEquals("127.0.0.1", response.getInstance());
+    }
+
+    @Test
+    public void testListComponentHeartbeat() {
+        HeartbeatPageRequest request = new HeartbeatPageRequest();
+        request.setComponent("Sort");
+        request.setPageNum(1);
+        request.setPageSize(10);
+        PageInfo<ComponentHeartbeatResponse> pageResponse = heartbeatService.listComponentHeartbeat(request);
+        Assert.assertEquals(1, pageResponse.getTotal());
+    }
+
+    @Test
+    public void testListGroupHeartbeat() {
+        HeartbeatPageRequest request = new HeartbeatPageRequest();
+        request.setComponent("Sort");
+        request.setInstance("127.0.0.1");
+        request.setPageNum(1);
+        request.setPageSize(10);
+        PageInfo<GroupHeartbeatResponse> pageResponse = heartbeatService.listGroupHeartbeat(request);
+        Assert.assertEquals(1, pageResponse.getTotal());
+    }
+
+    @Test
+    public void testListStreamHeartbeat() {
+        HeartbeatPageRequest request = new HeartbeatPageRequest();
+        request.setComponent("Sort");
+        request.setInstance("127.0.0.1");
+        request.setInlongGroupId("group1");
+        request.setPageNum(1);
+        request.setPageSize(10);
+        PageInfo<StreamHeartbeatResponse> pageResponse = heartbeatService.listStreamHeartbeat(request);
+        Assert.assertEquals(1, pageResponse.getTotal());
+    }
+}
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index d797e2091..7d8146bb3 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -621,22 +621,22 @@ CREATE TABLE `stream_source_field`
 DROP TABLE IF EXISTS `stream_transform_field`;
 CREATE TABLE `stream_transform_field`
 (
-    `id`               int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
     `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
-    `transform_id`     int(11) NOT NULL COMMENT 'Transform id',
+    `transform_id`     int(11)      NOT NULL COMMENT 'Transform id',
     `transform_type`   varchar(15)  NOT NULL COMMENT 'Transform type',
     `field_name`       varchar(50)  NOT NULL COMMENT 'Field name',
     `field_value`      varchar(128)  DEFAULT NULL COMMENT 'Field value, required if it is a predefined field',
     `pre_expression`   varchar(256)  DEFAULT NULL COMMENT 'Pre-defined field value expression',
     `field_type`       varchar(50)  NOT NULL COMMENT 'Field type',
     `field_comment`    varchar(2000) DEFAULT NULL COMMENT 'Field description',
-    `is_meta_field`    smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+    `is_meta_field`    smallint(3)   DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
     `field_format`     varchar(50)   DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
-    `rank_num`         smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
-    `is_deleted`       int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `rank_num`         smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+    `is_deleted`       int(11)       DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
     PRIMARY KEY (`id`),
-    KEY                `index_transform_id` (`transform_id`)
+    KEY `index_transform_id` (`transform_id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform field table';
 
@@ -1105,8 +1105,7 @@ CREATE TABLE `sort_source_config`
     `ext_params`   text         DEFAULT NULL COMMENT 'Another fields, will saved as JSON type',
     PRIMARY KEY (`id`),
     KEY `index_sort_source_config` (`cluster_name`, `task_name`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='Sort source config table';
+);
 
 
 -- ----------------------------
@@ -1126,69 +1125,65 @@ CREATE TABLE `stream_config_log`
     `report_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'report time',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`ip`, `config_name`, `component_name`, `log_type`, `inlong_stream_id`, `inlong_group_id`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8 COMMENT ='stream config log report information table';
+);
 
 -- ----------------------------
--- Table structure for inlong beat heart
+-- Table structure for inlong component heartbeat
 -- ----------------------------
 DROP TABLE IF EXISTS `component_heartbeat`;
 CREATE TABLE `component_heartbeat`
 (
-    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
-    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
-    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component identifier ip or id',
-    `status_heartbeat`        text         NOT NULL COMMENT 'component status info',
-    `metric_heartbeat`        text         NOT NULL COMMENT 'component metric info',
-    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
-    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `id`               int(11)     NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `component`        varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+    `instance`         varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+    `status_heartbeat` text        NOT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text        NOT NULL COMMENT 'Metric heartbeat info',
+    `report_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+    `create_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `index_beat_heart` (`component`, `instance`),
-    KEY `index_report_time` (`report_time`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8 COMMENT ='inlong heartbeat';
+    UNIQUE KEY `unique_component_heartbeat` (`component`, `instance`)
+);
 
 -- ----------------------------
--- Table structure for inlong stream status info
+-- Table structure for inlong group heartbeat
 -- ----------------------------
 DROP TABLE IF EXISTS `group_heartbeat`;
 CREATE TABLE `group_heartbeat`
 (
-    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
-    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
-    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component ip or id',
-    `inlong_group_id`         varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
-    `status_heartbeat`        text         NOT NULL COMMENT 'Inlong group status info',
-    `metric_heartbeat`        text         NOT NULL COMMENT 'Inlong group metric info',
-    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
-    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `component`        varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+    `instance`         varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+    `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
+    `status_heartbeat` text         NOT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text         NOT NULL COMMENT 'Metric heartbeat info',
+    `report_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8 COMMENT ='inlong group heartbeat';
+    UNIQUE KEY `unique_group_heartbeat` (`component`, `instance`, `inlong_group_id`)
+);
 
 -- ----------------------------
--- Table structure for inlong component static info
+-- Table structure for inlong stream heartbeat
 -- ----------------------------
 DROP TABLE IF EXISTS `stream_heartbeat`;
 CREATE TABLE `stream_heartbeat`
 (
-    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
-    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
-    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component ip or id',
-    `inlong_group_id`         varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
-    `inlong_stream_id`        varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong metric id',
-    `status_heartbeat`        text         NOT NULL COMMENT 'status info',
-    `metric_heartbeat`        text         NOT NULL COMMENT 'static info',
-    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
-    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `component`        varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+    `instance`         varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+    `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
+    `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong stream id',
+    `status_heartbeat` text         NOT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text         NOT NULL COMMENT 'Metric heartbeat info',
+    `report_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8 COMMENT ='inlong stream heartbeat';
+    UNIQUE KEY `unique_stream_heartbeat` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`)
+);
+
 -- ----------------------------
 
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 138d0f68b..5d501a001 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1182,67 +1182,65 @@ CREATE TABLE `stream_config_log`
   DEFAULT CHARSET = utf8 COMMENT ='stream config log report information table';
 
 -- ----------------------------
--- Table structure for inlong beat heart
+-- Table structure for inlong component heartbeat
 -- ----------------------------
 DROP TABLE IF EXISTS `component_heartbeat`;
 CREATE TABLE `component_heartbeat`
 (
-    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
-    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
-    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component identifier ip or id',
-    `status_heartbeat`        text         NOT NULL COMMENT 'component status info',
-    `metric_heartbeat`        text         NOT NULL COMMENT 'component metric info',
-    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
-    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `id`               int(11)     NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `component`        varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+    `instance`         varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+    `status_heartbeat` text        NOT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text        NOT NULL COMMENT 'Metric heartbeat info',
+    `report_time`      bigint(20)  NOT NULL COMMENT 'Report time',
+    `create_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `index_beat_heart` (`component`, `instance`),
-    KEY `index_report_time` (`report_time`)
+    UNIQUE KEY `unique_component_heartbeat` (`component`, `instance`)
 ) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8 COMMENT ='inlong heartbeat';
+  DEFAULT CHARSET = utf8 COMMENT ='Inlong component heartbeat';
 
 -- ----------------------------
--- Table structure for inlong stream status info
+-- Table structure for inlong group heartbeat
 -- ----------------------------
 DROP TABLE IF EXISTS `group_heartbeat`;
 CREATE TABLE `group_heartbeat`
 (
-    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
-    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
-    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component ip or id',
-    `inlong_group_id`         varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
-    `status_heartbeat`        text         NOT NULL COMMENT 'Inlong group status info',
-    `metric_heartbeat`        text         NOT NULL COMMENT 'Inlong group metric info',
-    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
-    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `component`        varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+    `instance`         varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+    `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
+    `status_heartbeat` text         NOT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text         NOT NULL COMMENT 'Metric heartbeat info',
+    `report_time`      bigint(20)   NOT NULL COMMENT 'Report time',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`),
-    KEY `index_report_time` (`report_time`)
+    UNIQUE KEY `unique_group_heartbeat` (`component`, `instance`, `inlong_group_id`)
 ) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8 COMMENT ='inlong group heartbeat';
+  DEFAULT CHARSET = utf8 COMMENT ='Inlong group heartbeat';
 
 -- ----------------------------
--- Table structure for inlong component static info
+-- Table structure for inlong stream heartbeat
 -- ----------------------------
 DROP TABLE IF EXISTS `stream_heartbeat`;
 CREATE TABLE `stream_heartbeat`
 (
-    `id`                      int(11)      NOT NULL AUTO_INCREMENT COMMENT 'id',
-    `component`               varchar(64)  NOT NULL DEFAULT '' COMMENT 'component',
-    `instance`                varchar(64)  NOT NULL DEFAULT '' COMMENT 'component ip or id',
-    `inlong_group_id`         varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
-    `inlong_stream_id`        varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong metric id',
-    `status_heartbeat`        text         NOT NULL COMMENT 'status info',
-    `metric_heartbeat`        text         NOT NULL COMMENT 'static info',
-    `report_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
-    `modify_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `create_time`             timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `component`        varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+    `instance`         varchar(64)  NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+    `inlong_group_id`  varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
+    `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong stream id',
+    `status_heartbeat` text         NOT NULL COMMENT 'Status heartbeat info',
+    `metric_heartbeat` text         NOT NULL COMMENT 'Metric heartbeat info',
+    `report_time`      bigint(20)   NOT NULL COMMENT 'Report time',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
-    UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`),
-    KEY `index_report_time` (`report_time`)
+    UNIQUE KEY `unique_stream_heartbeat` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`)
 ) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8 COMMENT ='inlong stream heartbeat';
+  DEFAULT CHARSET = utf8 COMMENT ='Inlong stream heartbeat';
+
 -- ----------------------------
 
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatController.java
new file mode 100644
index 000000000..080f0a80f
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatController.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import com.github.pagehelper.PageInfo;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatQueryRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/heartbeat")
+@Api(tags = "Heartbeat-API")
+public class HeartbeatController {
+
+    @Autowired
+    private HeartbeatService heartbeatService;
+
+    @RequestMapping(value = "/component/get", method = RequestMethod.POST)
+    @ApiOperation(value = "Get component heartbeat")
+    public Response<ComponentHeartbeatResponse> getComponentHeartbeat(@RequestBody HeartbeatQueryRequest request) {
+        return Response.success(heartbeatService.getComponentHeartbeat(request));
+    }
+
+    @RequestMapping(value = "/group/get", method = RequestMethod.POST)
+    @ApiOperation(value = "Get group heartbeat")
+    public Response<GroupHeartbeatResponse> getGroupHeartbeat(@RequestBody HeartbeatQueryRequest request) {
+        return Response.success(heartbeatService.getGroupHeartbeat(request));
+    }
+
+    @RequestMapping(value = "/stream/get", method = RequestMethod.POST)
+    @ApiOperation(value = "Get stream heartbeat")
+    public Response<StreamHeartbeatResponse> getStreamHeartbeat(@RequestBody HeartbeatQueryRequest request) {
+        return Response.success(heartbeatService.getStreamHeartbeat(request));
+    }
+
+    @RequestMapping(value = "/component/list", method = RequestMethod.POST)
+    @ApiOperation(value = "List component heartbeats")
+    public Response<PageInfo<ComponentHeartbeatResponse>> listComponentHeartbeat(
+            @RequestBody HeartbeatPageRequest request) {
+        return Response.success(heartbeatService.listComponentHeartbeat(request));
+    }
+
+    @RequestMapping(value = "/group/list", method = RequestMethod.POST)
+    @ApiOperation(value = "List group heartbeats")
+    public Response<PageInfo<GroupHeartbeatResponse>> listGroupHeartbeat(@RequestBody HeartbeatPageRequest request) {
+        return Response.success(heartbeatService.listGroupHeartbeat(request));
+    }
+
+    @RequestMapping(value = "/stream/list", method = RequestMethod.POST)
+    @ApiOperation(value = "List stream heartbeats")
+    public Response<PageInfo<StreamHeartbeatResponse>> listStreamHeartbeat(@RequestBody HeartbeatPageRequest request) {
+        return Response.success(heartbeatService.listStreamHeartbeat(request));
+    }
+
+}
+
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java
deleted file mode 100644
index 4087499c2..000000000
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.web.controller;
-
-import com.github.pagehelper.PageInfo;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatPageRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
-import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatPageRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
-import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatPageRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
-import org.apache.inlong.manager.service.core.HeartbeatService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RestController;
-
-@RestController
-@RequestMapping("/heartbeat/")
-@Api(tags = "Heartbeat")
-public class HeartbeatInfoController {
-
-    @Autowired
-    private HeartbeatService heartbeatService;
-
-    @RequestMapping(value = "/component/info", method = RequestMethod.POST)
-    @ApiOperation(value = "query component heartbeat")
-    public Response<ComponentHeartbeatResponse> queryComponentHeartbeatInfo(@RequestBody
-            ComponentHeartbeatRequest info) {
-        ComponentHeartbeatResponse response =
-                heartbeatService.getComponentHeartbeatInfo(info.getComponent(),
-                info.getInstance());
-        if (response == null) {
-            return Response.fail("Not found msg!");
-        } else {
-            return Response.success(response);
-        }
-    }
-
-    @RequestMapping(value = "/group/info", method = RequestMethod.POST)
-    @ApiOperation(value = "query group heartbeat")
-    public Response<GroupHeartbeatResponse> queryGroupHeartbeatInfo(@RequestBody
-            GroupHeartbeatRequest info) {
-        GroupHeartbeatResponse response = heartbeatService
-                .getGroupHeartbeatInfo(info.getComponent(), info.getInstance(),
-                        info.getInlongGroupId());
-        if (response == null) {
-            return Response.fail("Not found msg!");
-        } else {
-            return Response.success(response);
-        }
-    }
-
-    @RequestMapping(value = "/stream/info", method = RequestMethod.POST)
-    @ApiOperation(value = "query stream heartbeat")
-    public Response<StreamHeartbeatResponse> queryStreamHeartbeat(@RequestBody
-            StreamHeartbeatRequest info) {
-        StreamHeartbeatResponse response = heartbeatService
-                .getStreamHeartbeatInfo(info.getComponent(),
-                info.getInstance(), info.getInlongGroupId(), info.getInlongStreamId());
-        if (response == null) {
-            return Response.fail("Not found msg!");
-        } else {
-            return Response.success(response);
-        }
-    }
-
-    @RequestMapping(value = "/component/list", method = RequestMethod.POST)
-    @ApiOperation(value = "query component heartbeats")
-    public Response<PageInfo<ComponentHeartbeatResponse>> queryComponentHeartbeatInfos(@RequestBody
-            ComponentHeartbeatPageRequest info) {
-        PageInfo<ComponentHeartbeatResponse> responses =
-                heartbeatService.getComponentHeartbeatInfos(info.getComponent(), info.getPageNum(),
-                        info.getPageSize());
-        if (responses == null) {
-            return Response.fail("Not found msg!");
-        } else {
-            return Response.success(responses);
-        }
-    }
-
-    @RequestMapping(value = "/group/list", method = RequestMethod.POST)
-    @ApiOperation(value = "query group heartbeats")
-    public Response<PageInfo<GroupHeartbeatResponse>> queryGroupHeartbeatInfos(@RequestBody
-            GroupHeartbeatPageRequest info) {
-        PageInfo<GroupHeartbeatResponse> responses = heartbeatService
-                .getGroupHeartbeatInfos(info.getComponent(), info.getInstance(),
-                        info.getPageNum(), info.getPageSize());
-        if (responses == null) {
-            return Response.fail("Not found msg!");
-        } else {
-            return Response.success(responses);
-        }
-    }
-
-    @RequestMapping(value = "/stream/list", method = RequestMethod.POST)
-    @ApiOperation(value = "query stream heartbeats")
-    public Response<PageInfo<StreamHeartbeatResponse>> queryStreamHeartbeats(@RequestBody
-            StreamHeartbeatPageRequest info) {
-        PageInfo<StreamHeartbeatResponse> responses = heartbeatService
-                .getStreamHeartbeatInfos(info.getComponent(), info.getInstance(),
-                        info.getInlongGroupId(), info.getPageNum(), info.getPageSize());
-        if (responses == null) {
-            return Response.fail("Not found msg!");
-        } else {
-            return Response.success(responses);
-        }
-    }
-
-}
-
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenHeartbeatController.java
similarity index 77%
rename from inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java
rename to inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenHeartbeatController.java
index ea6cd0e42..5dbf44a34 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenHeartbeatController.java
@@ -23,7 +23,6 @@ import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
 import org.apache.inlong.manager.service.core.HeartbeatService;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -31,17 +30,17 @@ import org.springframework.web.bind.annotation.RestController;
 
 @RestController
 @RequestMapping("/openapi/heartbeat")
-@Api(tags = "Heartbeat")
-public class HeartbeatController {
+@Api(tags = "Open-Heartbeat-API")
+public class OpenHeartbeatController {
 
     @Autowired
-    private HeartbeatService heartBeatService;
+    private HeartbeatService heartbeatService;
 
-    @PostMapping(value = "/report",
-            produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
+    @PostMapping(value = "/report")
     @ApiOperation(value = "heartbeat report")
-    public Response<String> reportHeartbeat(@RequestBody HeartbeatReportRequest info) {
-        return Response.success(heartBeatService.reportHeartbeatInfo(info));
+    public Response<Boolean> reportHeartbeat(@RequestBody HeartbeatReportRequest info) {
+        return Response.success(heartbeatService.reportHeartbeat(info));
     }
+
 }
 
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java
deleted file mode 100644
index 7ed34b8b0..000000000
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.web.controller.openapi;
-
-import static org.mockito.BDDMockito.given;
-
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageInfo;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeat;
-import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
-import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
-import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
-import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
-import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
-import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
-import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
-import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
-import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
-import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
-import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
-import org.apache.inlong.manager.service.core.HeartbeatService;
-import org.apache.inlong.manager.service.core.impl.HeartbeatServiceImpl;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-public class HeartbeatControllerTest {
-
-    @InjectMocks
-    private HeartbeatService heartbeatService = new HeartbeatServiceImpl();
-
-    @Mock
-    private ComponentHeartbeatEntityMapper componentHeartbeatEntityMapper;
-
-    @Mock
-    private GroupHeartbeatEntityMapper groupHeartbeatEntityMapper;
-
-    @Mock
-    private StreamHeartbeatEntityMapper streamHeartbeatEntityMapper;
-
-    @Before
-    public void setUp() {
-        MockitoAnnotations.openMocks(this);
-
-        ComponentHeartbeatEntityWithBLOBs componentHeartbeatEntityWithBLOBs =
-                new ComponentHeartbeatEntityWithBLOBs();
-        componentHeartbeatEntityWithBLOBs.setComponent("Sort");
-        componentHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
-        componentHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"inlongGroupId\":\"groupId\","
-                + "\"componentStaticInfo\":\"\"}]");
-        componentHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"inlongGroupId\":\"groupId\","
-                + "\"streamStatusInfo\":\"\"}]");
-        componentHeartbeatEntityWithBLOBs.setReportTime(new Date());
-        Page<ComponentHeartbeatEntityWithBLOBs> componentPage = new Page<>();
-        componentPage.add(componentHeartbeatEntityWithBLOBs);
-        componentPage.setTotal(1);
-        given(componentHeartbeatEntityMapper.insert(new ComponentHeartbeatEntityWithBLOBs())).willReturn(1);
-        given(componentHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
-                Mockito.anyString())).willReturn(componentHeartbeatEntityWithBLOBs);
-        given(componentHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString()))
-                .willReturn(componentPage);
-
-        GroupHeartbeatEntityWithBLOBs groupHeartbeatEntityWithBLOBs =
-                new GroupHeartbeatEntityWithBLOBs();
-        groupHeartbeatEntityWithBLOBs.setComponent("Sort");
-        groupHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
-        groupHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
-                + ": \"10\"},"
-                + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
-        groupHeartbeatEntityWithBLOBs.setReportTime(new Date());
-        groupHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
-                + ": \"10\"},"
-                + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
-        Page<GroupHeartbeatEntityWithBLOBs> groupPage = new Page<>();
-        groupPage.add(groupHeartbeatEntityWithBLOBs);
-        groupPage.setTotal(1);
-        given(groupHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
-                Mockito.anyString(), Mockito.anyString())).willReturn(groupHeartbeatEntityWithBLOBs);
-        given(groupHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString(),
-                Mockito.anyString())).willReturn(groupPage);
-
-        StreamHeartbeatEntityWithBLOBs streamHeartbeatEntityWithBLOBs = new StreamHeartbeatEntityWithBLOBs();
-        streamHeartbeatEntityWithBLOBs.setComponent("Sort");
-        streamHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
-        streamHeartbeatEntityWithBLOBs.setInlongGroupId("group1");
-        streamHeartbeatEntityWithBLOBs.setInlongStreamId("test_test");
-        streamHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"statue\":\"running\"}]");
-        streamHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"outMsg\":\"1\",\"inMsg\":2}]");
-        streamHeartbeatEntityWithBLOBs.setReportTime(new Date());
-
-        Page<StreamHeartbeatEntityWithBLOBs> streamPage = new Page<>();
-        streamPage.add(streamHeartbeatEntityWithBLOBs);
-        streamPage.setTotal(1);
-
-        given(streamHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
-                Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
-                .willReturn(streamHeartbeatEntityWithBLOBs);
-        given(streamHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString(),
-                Mockito.anyString(), Mockito.anyString()))
-                .willReturn(streamPage);
-    }
-
-    @Test
-    public void testAddHeartbeat() throws Exception {
-
-        HeartbeatReportRequest request = new HeartbeatReportRequest();
-        request.setComponent("Sort");
-        request.setInstance("127.0.0.1");
-        request.setReportTimestamp(Instant.now().toEpochMilli());
-
-        ComponentHeartbeat componentHeartbeat = new ComponentHeartbeat();
-        componentHeartbeat.setMetricHeartbeat("{\"mem\":\"100\"}");
-        componentHeartbeat.setStatusHeartbeat("{\"runningTime\":\"10h.35m\","
-                + "\"status\":\"10h.35m\","
-                + "\"groupIds\":\"group1,group2\"}");
-
-        List<GroupHeartbeat> groupHeartbeats = new ArrayList<>();
-        GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
-        groupHeartbeat.setInlongGroupId("group1");
-        groupHeartbeat.setStatusHeartbeat("[{\"status\":\"running\",\"streamIds\":\"1,2,3,4\"}]");
-        request.setGroupHeartbeats(groupHeartbeats);
-
-        StreamHeartbeat streamHeartbeat = new StreamHeartbeat();
-        streamHeartbeat.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
-                + ": \"10\"},"
-                + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
-        streamHeartbeat.setStatusHeartbeat("{}");
-        streamHeartbeat.setInlongGroupId("group1");
-        streamHeartbeat.setInlongStreamId("1");
-        List<StreamHeartbeat> streamHeartbeats = new ArrayList<>();
-        streamHeartbeats.add(streamHeartbeat);
-        request.setStreamHeartbeats(streamHeartbeats);
-
-        Assert.assertEquals("Success", heartbeatService.reportHeartbeatInfo(request));
-    }
-
-    @Test
-    public void testQueryComponentHeartbeat() throws Exception {
-        ComponentHeartbeatResponse response
-                = heartbeatService.getComponentHeartbeatInfo("Sort", "127.0.0.1");
-        Assert.assertEquals("127.0.0.1", response.getInstance());
-    }
-
-    @Test
-    public void testQueryGroupHeartbeat() throws Exception {
-        GroupHeartbeatResponse response
-                = heartbeatService.getGroupHeartbeatInfo("Sort",
-                "127.0.0.1", "group1");
-        Assert.assertEquals("127.0.0.1", response.getInstance());
-    }
-
-    @Test
-    public void testQueryStreamHeartbeat() throws Exception {
-        StreamHeartbeatResponse response =
-                heartbeatService.getStreamHeartbeatInfo("Sort",
-                        "127.0.0.1", "group1", "stream1");
-        Assert.assertEquals("127.0.0.1", response.getInstance());
-    }
-
-    @Test
-    public void testQueryComponentHeartbeatPage() throws Exception {
-        PageInfo<ComponentHeartbeatResponse> pageResponse
-                = heartbeatService.getComponentHeartbeatInfos("Sort", 1,
-                10);
-        Assert.assertEquals(1, pageResponse.getTotal());
-    }
-
-    @Test
-    public void testQueryGroupHeartbeatPage() throws Exception {
-        PageInfo<GroupHeartbeatResponse> pageResponse
-                = heartbeatService.getGroupHeartbeatInfos("Sort",
-                "127.0.0.1", 1, 10);
-        Assert.assertEquals(1, pageResponse.getTotal());
-    }
-
-    @Test
-    public void testQueryStreamHeartbeatPage() throws Exception {
-        PageInfo<StreamHeartbeatResponse> pageResponse =
-                heartbeatService.getStreamHeartbeatInfos("Sort",
-                        "127.0.0.1", "group1", 1, 10);
-        Assert.assertEquals(1, pageResponse.getTotal());
-    }
-}