You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/26 03:09:18 UTC
[incubator-inlong] branch master updated: [INLONG-3856][Manager] Add beat heart common report handler (#3871)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 38a320807 [INLONG-3856][Manager] Add beat heart common report handler (#3871)
38a320807 is described below
commit 38a3208078d3d6926d7a682c77d0ba04feafa18d
Author: baomingyu <ba...@163.com>
AuthorDate: Tue Apr 26 11:09:13 2022 +0800
[INLONG-3856][Manager] Add beat heart common report handler (#3871)
---
.../inlong/manager/common/enums/ErrorCodeEnum.java | 2 +
.../common/pojo/heartbeat/ComponentHeartbeat.java} | 27 +-
.../heartbeat/ComponentHeartbeatPageRequest.java} | 35 +-
.../pojo/heartbeat/ComponentHeartbeatRequest.java} | 34 +-
.../heartbeat/ComponentHeartbeatResponse.java} | 41 +-
.../common/pojo/heartbeat/GroupHeartbeat.java} | 27 +-
.../pojo/heartbeat/GroupHeartbeatPageRequest.java} | 35 +-
.../pojo/heartbeat/GroupHeartbeatRequest.java} | 35 +-
.../pojo/heartbeat/GroupHeartbeatResponse.java} | 44 +-
.../pojo/heartbeat/HeartbeatReportRequest.java} | 43 +-
.../common/pojo/heartbeat/StreamHeartbeat.java} | 29 +-
.../heartbeat/StreamHeartbeatPageRequest.java} | 37 +-
.../pojo/heartbeat/StreamHeartbeatRequest.java} | 38 +-
.../pojo/heartbeat/StreamHeartbeatResponse.java} | 47 +-
.../dao/entity/ComponentHeartbeatEntity.java} | 46 +-
.../entity/ComponentHeartbeatEntityWithBLOBs.java} | 29 +-
.../manager/dao/entity/GroupHeartbeatEntity.java} | 48 +-
.../dao/entity/GroupHeartbeatEntityWithBLOBs.java} | 28 +-
.../manager/dao/entity/StreamHeartbeatEntity.java} | 50 ++-
.../entity/StreamHeartbeatEntityWithBLOBs.java} | 29 +-
.../mapper/ComponentHeartbeatEntityMapper.java} | 44 +-
.../dao/mapper/GroupHeartbeatEntityMapper.java} | 46 +-
.../dao/mapper/StreamHeartbeatEntityMapper.java | 41 ++
.../src/main/resources/generatorConfig.xml | 18 +
.../mappers/ComponentHeartbeatEntityMapper.xml | 83 ++++
.../mappers/GroupHeartbeatEntityMapper.xml | 86 ++++
.../mappers/StreamHeartbeatEntityMapper.xml | 87 ++++
.../manager/service/core/HeartbeatService.java | 101 +++++
.../service/core/impl/HeartbeatServiceImpl.java | 500 +++++++++++++++++++++
inlong-manager/manager-test/pom.xml | 9 +-
.../org/apache/inlong/manager/test/BaseTest.java | 3 +
.../main/resources/sql/apache_inlong_manager.sql | 85 ++++
.../manager-web/sql/apache_inlong_manager.sql | 62 +++
.../web/controller/HeartbeatInfoController.java | 133 ++++++
.../controller/openapi/HeartbeatController.java | 47 ++
.../openapi/HeartbeatControllerTest.java | 210 +++++++++
inlong-manager/pom.xml | 7 +
37 files changed, 1844 insertions(+), 422 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index f50e56b8f..3ec8491a2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -27,6 +27,8 @@ public enum ErrorCodeEnum {
GROUP_ID_IS_EMPTY(102, "Inlong group id is empty"),
STREAM_ID_IS_EMPTY(103, "Inlong stream id is empty"),
REQUEST_IS_EMPTY(104, "Request is empty"),
+ REQUEST_COMPONENT_EMPTY(105, "Component is empty"),
+ REQUEST_INSTANCE_EMPTY(106, "Instance is empty"),
USER_IS_NOT_MANAGER(110, "%s is not the manager, please contact %s"),
GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation authority"),
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
index a588fabd9..61cc41911 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
@@ -15,27 +15,16 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import lombok.Getter;
+import lombok.Setter;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+public class ComponentHeartbeat {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ private String statusHeartbeat;
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ private String metricHeartbeat;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java
index a588fabd9..06d95756f 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java
@@ -15,27 +15,22 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.inlong.manager.common.beans.PageRequest;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong component heartbeats query request")
+public class ComponentHeartbeatPageRequest
+ extends PageRequest {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ @ApiModelProperty
+ private String component;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java
index a588fabd9..bb2dd27c4 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java
@@ -15,27 +15,23 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong component heartbeat query request")
+public class ComponentHeartbeatRequest {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ @ApiModelProperty
+ private String component;
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ @ApiModelProperty
+ private String instance;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
index a588fabd9..68e944a7e 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
@@ -15,27 +15,32 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong component heartbeat query response")
+public class ComponentHeartbeatResponse {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ @ApiModelProperty
+ private String component;
- @Autowired
- DataSource dataSource;
+ @ApiModelProperty
+ private String instance;
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ @ApiModelProperty
+ private long reportTime;
+
+ @ApiModelProperty
+ private String statusHeartbeat;
+
+ @ApiModelProperty
+ private String metricHeartbeat;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
index a588fabd9..f2e81f9ed 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
@@ -15,27 +15,18 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import lombok.Getter;
+import lombok.Setter;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+public class GroupHeartbeat {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ private String inlongGroupId;
- @Autowired
- DataSource dataSource;
+ private String statusHeartbeat;
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ private String metricHeartbeat;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java
index a588fabd9..23e72f3d0 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java
@@ -15,27 +15,24 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.inlong.manager.common.beans.PageRequest;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+@Getter
+@Setter
+@ApiModel("Inlong group heartbeats request")
+public class GroupHeartbeatPageRequest extends PageRequest {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ @ApiModelProperty
+ private String component;
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ @ApiModelProperty
+ private String instance;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java
index a588fabd9..79ec29a78 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java
@@ -15,27 +15,26 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+@Getter
+@Setter
+@ApiModel("Inlong group heartbeat request")
+public class GroupHeartbeatRequest {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ @ApiModelProperty
+ private String component;
- @Autowired
- DataSource dataSource;
+ @ApiModelProperty
+ private String instance;
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ @ApiModelProperty
+ private String inlongGroupId;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
index a588fabd9..ed3ad688e 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
@@ -15,27 +15,35 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+@Getter
+@Setter
+@ApiModel("Inlong group heartbeat response")
+public class GroupHeartbeatResponse {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ @ApiModelProperty
+ private String component;
- @Autowired
- DataSource dataSource;
+ @ApiModelProperty
+ private String instance;
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ @ApiModelProperty
+ private long reportTime;
+
+ @ApiModelProperty
+ private String inlongGroupId;
+
+ @ApiModelProperty
+ private String statusHeartbeat;
+
+ @ApiModelProperty
+ private String metricHeartbeat;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
index a588fabd9..7f9bed089 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
@@ -15,27 +15,34 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import java.util.List;
+import lombok.Getter;
+import lombok.Setter;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+public class HeartbeatReportRequest {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ private String component;
- @Autowired
- DataSource dataSource;
+ private String instance;
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ private long reportTimestamp;
+
+ /*
+ * component
+ */
+ private ComponentHeartbeat componentHeartbeat;
+
+ /*
+ * group
+ */
+ private List<GroupHeartbeat> groupHeartbeats;
+
+ /*
+ * stream
+ */
+ private List<StreamHeartbeat> streamHeartbeats;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
index a588fabd9..391c62221 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
@@ -15,27 +15,20 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import lombok.Getter;
+import lombok.Setter;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+public class StreamHeartbeat {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ private String inlongGroupId;
- @Autowired
- DataSource dataSource;
+ private String inlongStreamId;
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ private String statusHeartbeat;
+
+ private String metricHeartbeat;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java
index a588fabd9..08ead5382 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java
@@ -15,27 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.inlong.manager.common.beans.PageRequest;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong stream heartbeats query request")
+public class StreamHeartbeatPageRequest
+ extends PageRequest {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ @ApiModelProperty
+ private String component;
- @Autowired
- DataSource dataSource;
+ @ApiModelProperty
+ private String instance;
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ @ApiModelProperty
+ private String inlongGroupId;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java
index a588fabd9..c5420a0b1 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java
@@ -15,27 +15,29 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong stream heartbeat query request")
+public class StreamHeartbeatRequest {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ @ApiModelProperty
+ private String component;
- @Autowired
- DataSource dataSource;
+ @ApiModelProperty
+ private String instance;
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ @ApiModelProperty
+ private String inlongGroupId;
+
+ @ApiModelProperty
+ private String inlongStreamId;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
index a588fabd9..4b808f090 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
@@ -15,27 +15,38 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.common.pojo.heartbeat;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.Getter;
+import lombok.Setter;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Getter
+@Setter
+@Data
+@ApiModel("Inlong stream heartbeat response")
+public class StreamHeartbeatResponse {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ @ApiModelProperty
+ private String component;
- @Autowired
- DataSource dataSource;
+ @ApiModelProperty
+ private String instance;
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
+ @ApiModelProperty
+ private long reportTime;
+
+ @ApiModelProperty
+ private String inlongGroupId;
+
+ @ApiModelProperty
+ private String inlongStreamId;
+
+ @ApiModelProperty
+ private String statusHeartbeat;
+
+ @ApiModelProperty
+ private String metricHeartbeat;
}
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
index a588fabd9..b2d8ccca5 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
@@ -15,27 +15,25 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
-
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
-
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
-
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
-}
+package org.apache.inlong.manager.dao.entity;
+
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Data;
+
+@Data
+public class ComponentHeartbeatEntity implements Serializable {
+ private Integer id;
+
+ private String component;
+
+ private String instance;
+
+ private Date reportTime;
+
+ private Date modifyTime;
+
+ private Date createTime;
+
+ private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java
index a588fabd9..a9eaa8691 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java
@@ -15,27 +15,16 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.dao.entity;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import java.io.Serializable;
+import lombok.Data;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+public class ComponentHeartbeatEntityWithBLOBs extends ComponentHeartbeatEntity implements Serializable {
+ private String statusHeartbeat;
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ private String metricHeartbeat;
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
-}
+ private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
index a588fabd9..230202497 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
@@ -15,27 +15,27 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
-
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
-
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
-
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
-}
+package org.apache.inlong.manager.dao.entity;
+
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Data;
+
+@Data
+public class GroupHeartbeatEntity implements Serializable {
+ private Integer id;
+
+ private String component;
+
+ private String instance;
+
+ private String inlongGroupId;
+
+ private Date reportTime;
+
+ private Date modifyTime;
+
+ private Date createTime;
+
+ private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java
index a588fabd9..f747cde3f 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java
@@ -15,27 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.dao.entity;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import java.io.Serializable;
+import lombok.Data;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+public class GroupHeartbeatEntityWithBLOBs extends GroupHeartbeatEntity implements Serializable {
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ private String statusHeartbeat;
- @Autowired
- DataSource dataSource;
+ private String metricHeartbeat;
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
-}
+ private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
index a588fabd9..8ee2255c9 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
@@ -15,27 +15,29 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
-
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
-
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
-
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
-}
+package org.apache.inlong.manager.dao.entity;
+
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Data;
+
+@Data
+public class StreamHeartbeatEntity implements Serializable {
+ private Integer id;
+
+ private String component;
+
+ private String instance;
+
+ private String inlongGroupId;
+
+ private String inlongStreamId;
+
+ private Date reportTime;
+
+ private Date modifyTime;
+
+ private Date createTime;
+
+ private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java
index a588fabd9..83ceb1633 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java
@@ -15,27 +15,16 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
+package org.apache.inlong.manager.dao.entity;
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
+import java.io.Serializable;
+import lombok.Data;
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
+@Data
+public class StreamHeartbeatEntityWithBLOBs extends StreamHeartbeatEntity implements Serializable {
+ private String statusHeartbeat;
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
+ private String metricHeartbeat;
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
-}
+ private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
similarity index 52%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
index a588fabd9..2e653dc7e 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
@@ -15,27 +15,23 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
-
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
-
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
-
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
-}
+package org.apache.inlong.manager.dao.mapper;
+
+import java.util.List;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
+
+public interface ComponentHeartbeatEntityMapper {
+ int deleteByPrimaryKey(Integer id);
+
+ int insert(ComponentHeartbeatEntityWithBLOBs record);
+
+ ComponentHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+
+ ComponentHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
+ @Param("instance") String instance);
+
+ List<ComponentHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component);
+
+ int updateByKeyWithBLOBs(ComponentHeartbeatEntityWithBLOBs record);
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
similarity index 50%
copy from inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
index a588fabd9..e37062e4e 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
@@ -15,27 +15,25 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.test;
-
-import org.mvnsearch.h2.H2FunctionsLoader;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.test.context.ActiveProfiles;
-
-import javax.annotation.PostConstruct;
-import javax.sql.DataSource;
-
-@ActiveProfiles(value = {"test"})
-@EnableConfigurationProperties
-@ComponentScan(basePackages = "org.apache.inlong.manager")
-public class BaseTest {
-
- @Autowired
- DataSource dataSource;
-
- @PostConstruct
- public void initH2Function() {
- H2FunctionsLoader.loadMysqlFunctions(dataSource);
- }
-}
+package org.apache.inlong.manager.dao.mapper;
+
+import java.util.List;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
+
+public interface GroupHeartbeatEntityMapper {
+ int deleteByPrimaryKey(Integer id);
+
+ int insert(GroupHeartbeatEntityWithBLOBs record);
+
+ GroupHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+
+ GroupHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
+ @Param("instance") String instance,
+ @Param("inlongGroupId") String inlongGroupId);
+
+ List<GroupHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component,
+ @Param("instance") String instance);
+
+ int updateByKeyWithBLOBs(GroupHeartbeatEntityWithBLOBs record);
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java
new file mode 100644
index 000000000..2ab8293e5
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.mapper;
+
+import java.util.List;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
+
+public interface StreamHeartbeatEntityMapper {
+ int deleteByPrimaryKey(Integer id);
+
+ int insert(StreamHeartbeatEntityWithBLOBs record);
+
+ StreamHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+
+ StreamHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
+ @Param("instance") String instance,
+ @Param("inlongGroupId") String inlongGroupId,
+ @Param("inlongStreamId") String inlongStreamId);
+
+ List<StreamHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component,
+ @Param("instance") String instance,
+ @Param("inlongGroupId") String inlongGroupId);
+
+ int updateByKeyWithBLOBs(StreamHeartbeatEntityWithBLOBs record);
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index 33073bbf0..e5876c57a 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -239,6 +239,24 @@
enableDeleteByPrimaryKey="true" enableInsert="true"
enableCountByExample="false" enableDeleteByExample="false"
enableSelectByExample="false" enableUpdateByExample="false"/>-->
+ <table tableName="component_heartbeat" domainObjectName="ComponentHeartbeatEntity"
+ enableSelectByPrimaryKey="true"
+ enableUpdateByPrimaryKey="true"
+ enableDeleteByPrimaryKey="true" enableInsert="true"
+ enableCountByExample="false" enableDeleteByExample="false"
+ enableSelectByExample="false" enableUpdateByExample="false"/>
+ <table tableName="group_heartbeat" domainObjectName="GroupHeartbeatEntity"
+ enableSelectByPrimaryKey="true"
+ enableUpdateByPrimaryKey="true"
+ enableDeleteByPrimaryKey="true" enableInsert="true"
+ enableCountByExample="false" enableDeleteByExample="false"
+ enableSelectByExample="false" enableUpdateByExample="false"/>
+ <table tableName="stream_heartbeat" domainObjectName="StreamHeartbeatEntity"
+ enableSelectByPrimaryKey="true"
+ enableUpdateByPrimaryKey="true"
+ enableDeleteByPrimaryKey="true" enableInsert="true"
+ enableCountByExample="false" enableDeleteByExample="false"
+ enableSelectByExample="false" enableUpdateByExample="false"/>
</context>
</generatorConfiguration>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
new file mode 100644
index 000000000..508210e3d
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
+ <id column="id" jdbcType="INTEGER" property="id" />
+ <result column="component" jdbcType="VARCHAR" property="component" />
+ <result column="instance" jdbcType="VARCHAR" property="instance" />
+ <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
+ <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
+ </resultMap>
+ <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
+ <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
+ <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, component, instance, report_time, modify_time, create_time
+ </sql>
+ <sql id="Blob_Column_List">
+ status_heartbeat, metric_heartbeat
+ </sql>
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
+ select
+ <include refid="Base_Column_List" />
+ ,
+ <include refid="Blob_Column_List" />
+ from component_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+ select
+ <include refid="Base_Column_List" />
+ ,
+ <include refid="Blob_Column_List" />
+ from component_heartbeat
+ where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
+ </select>
+ <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+ select
+ <include refid="Base_Column_List" />
+ ,
+ <include refid="Blob_Column_List" />
+ from component_heartbeat
+ where component = #{component,jdbcType=VARCHAR}
+ </select>
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete from component_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
+ insert into component_heartbeat (component, instance,
+ report_time, status_heartbeat, metric_heartbeat)
+ values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+ #{reportTime,jdbcType=TIMESTAMP}, #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR})
+ </insert>
+ <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
+ update component_heartbeat
+ set
+ report_time = #{reportTime,jdbcType=TIMESTAMP},
+ status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
+ metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
+ where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
+ </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml
new file mode 100644
index 000000000..bbba5e324
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity">
+ <id column="id" jdbcType="INTEGER" property="id" />
+ <result column="component" jdbcType="VARCHAR" property="component" />
+ <result column="instance" jdbcType="VARCHAR" property="instance" />
+ <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
+ <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
+ <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
+ </resultMap>
+ <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
+ <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
+ <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, component, instance, inlong_group_id, report_time, modify_time, create_time
+ </sql>
+ <sql id="Blob_Column_List">
+ status_heartbeat, metric_heartbeat
+ </sql>
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
+ select
+ <include refid="Base_Column_List" />
+ ,
+ <include refid="Blob_Column_List" />
+ from group_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+ select
+ <include refid="Base_Column_List" />
+ ,
+ <include refid="Blob_Column_List" />
+ from group_heartbeat
+ where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+ </select>
+ <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+ select
+ <include refid="Base_Column_List" />
+ ,
+ <include refid="Blob_Column_List" />
+ from group_heartbeat
+ where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
+ </select>
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete from group_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
+ insert into group_heartbeat (component, instance,
+ inlong_group_id, report_time, status_heartbeat, metric_heartbeat
+ )
+ values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+ #{inlongGroupId,jdbcType=VARCHAR}, #{reportTime,jdbcType=TIMESTAMP}, #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR}
+ )
+ </insert>
+ <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
+ update group_heartbeat
+ set
+ report_time = #{reportTime,jdbcType=TIMESTAMP},
+ status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
+ metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
+ where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+ </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml
new file mode 100644
index 000000000..c3762875e
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
+ <id column="id" jdbcType="INTEGER" property="id" />
+ <result column="component" jdbcType="VARCHAR" property="component" />
+ <result column="instance" jdbcType="VARCHAR" property="instance" />
+ <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
+ <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
+ <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
+ <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
+ </resultMap>
+ <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
+ <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
+ <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, component, instance, inlong_group_id, inlong_stream_id, report_time, modify_time,
+ create_time
+ </sql>
+ <sql id="Blob_Column_List">
+ status_heartbeat, metric_heartbeat
+ </sql>
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
+ select
+ <include refid="Base_Column_List" />
+ ,
+ <include refid="Blob_Column_List" />
+ from stream_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+ select
+ <include refid="Base_Column_List" />
+ ,
+ <include refid="Blob_Column_List" />
+ from stream_heartbeat
+ where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR} and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
+ </select>
+ <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
+ select
+ <include refid="Base_Column_List" />
+ ,
+ <include refid="Blob_Column_List" />
+ from stream_heartbeat
+ where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+ </select>
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete from stream_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
+ insert into stream_heartbeat (component, instance,
+ inlong_group_id, inlong_stream_id, report_time, status_heartbeat, metric_heartbeat)
+ values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+ #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, #{reportTime,jdbcType=TIMESTAMP},
+ #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR})
+ </insert>
+ <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
+ update stream_heartbeat
+ set
+ report_time = #{reportTime,jdbcType=TIMESTAMP},
+ status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
+ metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
+ where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR} and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
+ </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java
new file mode 100644
index 000000000..ea8acf4aa
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+
+/**
+ * Heartbeat Service
+ */
+public interface HeartbeatService {
+
+ /**
+ * report Heartbeat
+ * @param request request
+ * @return
+ */
+ String reportHeartbeatInfo(HeartbeatReportRequest request);
+
+ /**
+ * get Component HeartbeatInfo
+ * @param component component
+ * @param instance instance
+ * @return ComponentHeartBeatResponse
+ */
+ ComponentHeartbeatResponse getComponentHeartbeatInfo(String component,
+ String instance);
+
+ /**
+ * get Group HeartbeatInfo
+ * @param component component
+ * @param instance instance
+ * @param inlongGroupId inlongGroupId
+ * @return GroupHeartbeatResponse
+ */
+ GroupHeartbeatResponse getGroupHeartbeatInfo(String component,
+ String instance, String inlongGroupId);
+
+ /**
+ * get Stream HeartbeatInfo
+ * @param component component
+ * @param instance instance
+ * @param inlongGroupId inlongGroupId
+ * @param inlongStreamId inlongStreamId
+ * @return StreamHeartBeatResponse
+ */
+ StreamHeartbeatResponse getStreamHeartbeatInfo(String component,
+ String instance, String inlongGroupId, String inlongStreamId);
+
+ /**
+ * get Component HeartbeatInfos by page
+ * @param component component
+ * @param pageNum pageNum
+ * @param pageSize pageSize
+ * @return one page of ComponentHeartBeatResponse
+ */
+ PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfos(String component, int pageNum,
+ int pageSize);
+
+ /**
+ * get Group HeartbeatInfos by page
+ * @param component component
+ * @param instance instance
+ * @param pageNum pageNum
+ * @param pageSize pageSize
+ * @return one page of GroupHeartbeatResponse
+ */
+ PageInfo<GroupHeartbeatResponse> getGroupHeartbeatInfos(String component,
+ String instance, int pageNum, int pageSize);
+
+ /**
+ * get Stream HeartbeatInfos
+ * @param component component
+ * @param instance instance
+ * @param inlongGroupId inlongGroupId
+ * @param pageNum pageNum
+ * @param pageSize pageSize
+ * @return one page of GroupHeartbeatResponse
+ */
+ PageInfo<StreamHeartbeatResponse> getStreamHeartbeatInfos(String component,
+ String instance, String inlongGroupId, int pageNum, int pageSize);
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
new file mode 100644
index 000000000..3b46c771f
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.impl;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import com.google.gson.Gson;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.ComponentTypeEnum;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * report or query heartbeat info
+ */
+@Service
+public class HeartbeatServiceImpl
+ implements HeartbeatService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatServiceImpl.class);
+
+ @Autowired
+ private ComponentHeartbeatEntityMapper componentHeartbeatEntityMapper;
+
+ @Autowired
+ private GroupHeartbeatEntityMapper groupHeartbeatEntityMapper;
+
+ @Autowired
+ private StreamHeartbeatEntityMapper streamHeartbeatEntityMapper;
+
+ private static Gson gson = new Gson();
+
+ /**
+ * heartbeat common handler
+ * @param request request
+ * @return
+ */
+ @Override
+ public String reportHeartbeatInfo(HeartbeatReportRequest request) {
+ if (request != null && StringUtils.isNotEmpty(request.getComponent())) {
+ ComponentTypeEnum componentType =
+ ComponentTypeEnum.valueOf(request.getComponent());
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ default:
+ updateByDefaultWay(request);
+ }
+ } else {
+ LOGGER.warn("request is null or component [{}] is not supported",
+ request.getComponent());
+ }
+ return "Success";
+ }
+
+ /**
+ * update Heartbeat Data
+ * @param request request
+ */
+ private void updateComponentHeartbeatData(HeartbeatReportRequest request) {
+ if (request == null || request.getComponentHeartbeat() == null) {
+ return;
+ }
+ ComponentHeartbeatEntityWithBLOBs entity = new ComponentHeartbeatEntityWithBLOBs();
+ entity.setComponent(request.getComponent());
+ entity.setInstance(request.getInstance());
+ entity.setReportTime(new Date(request.getReportTimestamp()));
+ if (StringUtils.isNotEmpty(request.getComponentHeartbeat().getStatusHeartbeat())) {
+ entity.setStatusHeartbeat(request.getComponentHeartbeat().getStatusHeartbeat());
+ }
+ if (StringUtils.isNotEmpty(request.getComponentHeartbeat().getMetricHeartbeat())) {
+ entity.setMetricHeartbeat(request.getComponentHeartbeat().getMetricHeartbeat());
+ }
+ int count = componentHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
+ if (count == 0) {
+ componentHeartbeatEntityMapper.insert(entity);
+ }
+ }
+
+ /**
+ * update Component StaticData
+ * @param request request
+ */
+ private void updateGroupHeartbeatData(HeartbeatReportRequest request) {
+ List<GroupHeartbeat> list = request.getGroupHeartbeats();
+ if (list != null) {
+ for (GroupHeartbeat info : list) {
+ GroupHeartbeatEntityWithBLOBs entity = new GroupHeartbeatEntityWithBLOBs();
+ entity.setComponent(request.getComponent());
+ entity.setInstance(request.getInstance());
+ entity.setReportTime(new Date(request.getReportTimestamp()));
+ entity.setInlongGroupId(info.getInlongGroupId());
+ entity.setMetricHeartbeat(info.getMetricHeartbeat());
+ entity.setStatusHeartbeat(info.getStatusHeartbeat());
+ int count = groupHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
+ if (count == 0) {
+ groupHeartbeatEntityMapper.insert(entity);
+ }
+ }
+ }
+ }
+
+ /**
+ * update Stream Status Data
+ * @param request request
+ */
+ private void updateStreamHeartBeatData(HeartbeatReportRequest request) {
+ List<StreamHeartbeat> list = request.getStreamHeartbeats();
+ if (list != null) {
+ for (StreamHeartbeat info : list) {
+ StreamHeartbeatEntityWithBLOBs entity = new StreamHeartbeatEntityWithBLOBs();
+ entity.setComponent(request.getComponent());
+ entity.setInstance(request.getInstance());
+ entity.setReportTime(new Date(request.getReportTimestamp()));
+ entity.setInlongGroupId(info.getInlongGroupId());
+ entity.setInlongStreamId(info.getInlongStreamId());
+ entity.setMetricHeartbeat(info.getMetricHeartbeat());
+ entity.setStatusHeartbeat(info.getStatusHeartbeat());
+ int count = streamHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
+ if (count == 0) {
+ streamHeartbeatEntityMapper.insert(entity);
+ }
+ }
+ }
+ }
+
+ /**
+ * get heartbeat info
+ * @param component componentTypeName
+ * @param instance instanceIdentifier
+ * @return heartbeat info
+ */
+ @Override
+ public ComponentHeartbeatResponse getComponentHeartbeatInfo(String component,
+ String instance) {
+ if (component != null && StringUtils.isNotEmpty(component)) {
+ ComponentTypeEnum componentType =
+ ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ default:
+ return getComponentHeartbeatInfoByDefaultWay(component, instance);
+ }
+ } else {
+ LOGGER.warn("request is null or component type is null");
+ }
+ return null;
+ }
+
+ /**
+ * get heartbeat static info
+ * @param component componentTypeName
+ * @param instance instanceIdentifier
+ * @param inlongGroupId inlongGroupId
+ * @return heartbeatStaticInfoResponse
+ */
+ @Override
+ public GroupHeartbeatResponse getGroupHeartbeatInfo(String component,
+ String instance, String inlongGroupId) {
+ if (component != null && StringUtils.isNotEmpty(component)) {
+ ComponentTypeEnum componentType =
+ ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ default:
+ return getGroupHeartbeatByDefaultWay(component, instance, inlongGroupId);
+ }
+ } else {
+ LOGGER.warn("request is null or component type is null");
+ }
+ return null;
+ }
+
+ /**
+ * get heartbeat status info
+ * @param component componentTypeName
+ * @param instance instanceIdentifier
+ * @param inlongGroupId inlongGroupId
+ * @param inlongStreamId inlongStreamId
+ * @return heartbeatStatusInfoResponse
+ */
+ @Override
+ public StreamHeartbeatResponse getStreamHeartbeatInfo(String component,
+ String instance, String inlongGroupId, String inlongStreamId) {
+ if (component != null && StringUtils.isNotEmpty(component)) {
+ ComponentTypeEnum componentType =
+ ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ default:
+ return getStreamHeartbeatByDefaultWay(component, instance,
+ inlongGroupId, inlongStreamId);
+ }
+ } else {
+ LOGGER.warn("request is null or component type is null");
+ }
+ return null;
+ }
+
+ /**
+ * get component heartbeat infos
+ * @param component component
+ * @param pageNum pageNum
+ * @param pageSize pageSize
+ * @return pageInfos
+ */
+ @Override
+ public PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfos(String component,
+ int pageNum, int pageSize) {
+ if (component != null && StringUtils.isNotEmpty(component)) {
+ ComponentTypeEnum componentType =
+ ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ default:
+ return getComponentHeartbeatInfosByDefaultWay(component, pageNum, pageSize);
+ }
+ } else {
+ LOGGER.warn("request is null or component type is null");
+ }
+ return null;
+ }
+
+ /**
+ * get group heartbeat infos
+ * @param component component
+ * @param instance instance
+ * @param pageNum pageNum
+ * @param pageSize pageSize
+ * @return pageInfo
+ */
+ @Override
+ public PageInfo<GroupHeartbeatResponse> getGroupHeartbeatInfos(String component,
+ String instance, int pageNum, int pageSize) {
+ if (component != null && StringUtils.isNotEmpty(component)) {
+ ComponentTypeEnum componentType =
+ ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ default:
+ return getGroupHeartbeatsByDefaultWay(component, instance,
+ pageNum, pageSize);
+ }
+ } else {
+ LOGGER.warn("request is null or component type is null");
+ }
+ return null;
+ }
+
+ /**
+ * get stream heartbeat infos
+ * @param component component
+ * @param instance instance
+ * @param inlongGroupId inlongGroupId
+ * @param pageNum pageNum
+ * @param pageSize pageSize
+ * @return pageInfo
+ */
+ @Override
+ public PageInfo<StreamHeartbeatResponse> getStreamHeartbeatInfos(String component,
+ String instance, String inlongGroupId, int pageNum, int pageSize) {
+ if (component != null && StringUtils.isNotEmpty(component)) {
+ ComponentTypeEnum componentType =
+ ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ default:
+ return getStreamHeartbeatsByDefaultWay(component, instance,
+ inlongGroupId, pageNum, pageSize);
+ }
+ } else {
+ LOGGER.warn("request is null or component type is null");
+ }
+ return null;
+ }
+
+ /**
+ * update By DefaultWay
+ * @param request request
+ */
+ private void updateByDefaultWay(HeartbeatReportRequest request) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("InlongHeartbeatReportRequest json = {}", gson.toJson(request));
+ }
+ updateComponentHeartbeatData(request);
+ updateGroupHeartbeatData(request);
+ updateStreamHeartBeatData(request);
+ }
+
+ /**
+ * get Heartbeat InfoByDefaultWay
+ * @param component component
+ * @param instance instance
+ * @return HeartbeatInfoResponse
+ */
+ private ComponentHeartbeatResponse getComponentHeartbeatInfoByDefaultWay(String component,
+ String instance) {
+ ComponentHeartbeatEntityWithBLOBs result =
+ componentHeartbeatEntityMapper.selectByKey(component, instance);
+ ComponentHeartbeatResponse componentHeartBeatResponse = null;
+ if (result != null) {
+ componentHeartBeatResponse = new ComponentHeartbeatResponse();
+ componentHeartBeatResponse.setComponent(result.getComponent());
+ componentHeartBeatResponse.setInstance(result.getInstance());
+ componentHeartBeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
+ componentHeartBeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
+ componentHeartBeatResponse.setReportTime(result.getReportTime().getTime());
+ }
+ return componentHeartBeatResponse;
+ }
+
+ /**
+ * get heartbeat StaticInfo ByDefaultWay
+ * @param component component
+ * @param instance instance
+ * @param inlongGroupId inlongGroupId
+ * @return heartbeatStaticInfoResponse
+ */
+ private GroupHeartbeatResponse getGroupHeartbeatByDefaultWay(String component,
+ String instance, String inlongGroupId) {
+
+ GroupHeartbeatEntityWithBLOBs result =
+ groupHeartbeatEntityMapper.selectByKey(component,
+ instance, inlongGroupId);
+ GroupHeartbeatResponse groupHeartbeatResponse = null;
+ if (result != null) {
+ groupHeartbeatResponse = new GroupHeartbeatResponse();
+ groupHeartbeatResponse.setInlongGroupId(inlongGroupId);
+ groupHeartbeatResponse.setComponent(result.getComponent());
+ groupHeartbeatResponse.setInstance(result.getInstance());
+ groupHeartbeatResponse.setReportTime(result.getReportTime().getTime());
+ groupHeartbeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
+ groupHeartbeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
+ }
+ return groupHeartbeatResponse;
+ }
+
+ /**
+ * get Heartbeat StatusInfo ByDefaultWay
+ * @param componentTypeName componentTypeName
+ * @param instanceIdentifier instanceIdentifier
+ * @param inlongGroupId inlongGroupId
+ * @return heartbeatStatusInfoResponse
+ */
+ private StreamHeartbeatResponse getStreamHeartbeatByDefaultWay(String componentTypeName,
+ String instanceIdentifier, String inlongGroupId, String inlongStreamId) {
+ StreamHeartbeatEntityWithBLOBs result =
+ streamHeartbeatEntityMapper.selectByKey(componentTypeName,
+ instanceIdentifier, inlongGroupId, inlongStreamId);
+ StreamHeartbeatResponse streamHeartBeatResponse = null;
+ if (result != null) {
+ streamHeartBeatResponse = new StreamHeartbeatResponse();
+ streamHeartBeatResponse.setComponent(result.getComponent());
+ streamHeartBeatResponse.setInstance(result.getInstance());
+ streamHeartBeatResponse.setReportTime(result.getReportTime().getTime());
+ streamHeartBeatResponse.setInlongGroupId(result.getInlongGroupId());
+ streamHeartBeatResponse.setInlongStreamId(result.getInlongStreamId());
+ streamHeartBeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
+ streamHeartBeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
+ }
+ return streamHeartBeatResponse;
+ }
+
+ /**
+ * get Heartbeat InfoByDefaultWay
+ * @param component component
+ * @param pageNum pageNum
+ * @param pageSize pageSize
+ * @return HeartbeatInfoResponse
+ */
+ private PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfosByDefaultWay(String component,
+ int pageNum, int pageSize) {
+ Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+ PageHelper.startPage(pageNum, pageSize);
+
+ Page<ComponentHeartbeatEntityWithBLOBs> entityPage = (Page<ComponentHeartbeatEntityWithBLOBs>)
+ componentHeartbeatEntityMapper.selectHeartbeats(component);
+
+ List<ComponentHeartbeatResponse> componentHeartBeatResponses = new ArrayList<>();
+ PageInfo<ComponentHeartbeatResponse> pageInfo;
+ if (entityPage != null) {
+ componentHeartBeatResponses = CommonBeanUtils
+ .copyListProperties(entityPage, ComponentHeartbeatResponse::new);
+ }
+ pageInfo = new PageInfo<>(componentHeartBeatResponses);
+ pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+ return pageInfo;
+ }
+
+ /**
+ * get heartbeat StaticInfo ByDefaultWay
+ * @param component component
+ * @param instance instance
+ * @param pageNum pageNum
+ * @param pageSize pageSize
+ * @return heartbeatStaticInfoResponse
+ */
+ private PageInfo<GroupHeartbeatResponse> getGroupHeartbeatsByDefaultWay(String component,
+ String instance, int pageNum, int pageSize) {
+ Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+ Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
+ PageHelper.startPage(pageNum, pageSize);
+ Page<GroupHeartbeatEntityWithBLOBs> entityPage = (Page<GroupHeartbeatEntityWithBLOBs>)
+ groupHeartbeatEntityMapper.selectHeartbeats(component, instance);
+ List<GroupHeartbeatResponse> groupHeartbeatResponses = new ArrayList<>();
+ PageInfo<GroupHeartbeatResponse> pageInfo;
+ if (entityPage != null) {
+ groupHeartbeatResponses = CommonBeanUtils
+ .copyListProperties(entityPage, GroupHeartbeatResponse::new);
+ }
+ pageInfo = new PageInfo<>(groupHeartbeatResponses);
+ pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+ return pageInfo;
+ }
+
+ /**
+ * get Heartbeat StatusInfo ByDefaultWay
+ * @param component component
+ * @param instance instance
+ * @param inlongGroupId inlongGroupId
+ * @param pageNum pageNum
+ * @param pageSize pageSize
+ * @return heartbeatStatusInfoResponse
+ */
+ private PageInfo<StreamHeartbeatResponse> getStreamHeartbeatsByDefaultWay(String component,
+ String instance, String inlongGroupId, int pageNum, int pageSize) {
+ Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+ Preconditions.checkNotNull(instance, ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
+ Preconditions.checkNotNull(inlongGroupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ PageHelper.startPage(pageNum, pageSize);
+ Page<StreamHeartbeatEntityWithBLOBs> entityPage = (Page<StreamHeartbeatEntityWithBLOBs>)
+ streamHeartbeatEntityMapper.selectHeartbeats(component, instance, inlongGroupId);
+ List<StreamHeartbeatResponse> streamHeartBeatResponses = new ArrayList<>();
+ PageInfo<StreamHeartbeatResponse> pageInfo;
+ if (entityPage != null) {
+ streamHeartBeatResponses = CommonBeanUtils
+ .copyListProperties(entityPage, StreamHeartbeatResponse::new);
+ }
+ pageInfo = new PageInfo<>(streamHeartBeatResponses);
+ pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+ return pageInfo;
+ }
+}
diff --git a/inlong-manager/manager-test/pom.xml b/inlong-manager/manager-test/pom.xml
index 935b53192..1feec8fa6 100644
--- a/inlong-manager/manager-test/pom.xml
+++ b/inlong-manager/manager-test/pom.xml
@@ -49,15 +49,16 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java b/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
index a588fabd9..a9b9f3a54 100644
--- a/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
+++ b/inlong-manager/manager-test/src/main/java/org/apache/inlong/manager/test/BaseTest.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.test;
+import org.junit.runner.RunWith;
import org.mvnsearch.h2.H2FunctionsLoader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -25,10 +26,12 @@ import org.springframework.test.context.ActiveProfiles;
import javax.annotation.PostConstruct;
import javax.sql.DataSource;
+import org.springframework.test.context.junit4.SpringRunner;
@ActiveProfiles(value = {"test"})
@EnableConfigurationProperties
@ComponentScan(basePackages = "org.apache.inlong.manager")
+@RunWith(SpringRunner.class)
public class BaseTest {
@Autowired
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 43075dd08..63ccca42b 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -1061,4 +1061,89 @@ CREATE TABLE `sort_source_config`
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Sort source config table';
+
+-- ----------------------------
+-- Table structure for config log report
+-- ----------------------------
+DROP TABLE IF EXISTS `stream_config_log`;
+CREATE TABLE `stream_config_log`
+(
+ `ip` varchar(24) NOT NULL COMMENT 'client host ip',
+ `version` varchar(64) DEFAULT NULL COMMENT 'client version',
+ `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong stream ID for consumption',
+ `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+ `component_name` varchar(64) NOT NULL DEFAULT '' COMMENT 'current report info component name',
+ `config_name` varchar(64) NOT NULL DEFAULT '' COMMENT 'massage in heartbeat request',
+ `log_type` int(1) DEFAULT 0 COMMENT '0 normal, 1 error',
+ `log_info` text DEFAULT NULL COMMENT 'massage in heartbeat request',
+ `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'report time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`ip`, `config_name`, `component_name`, `log_type`, `inlong_stream_id`, `inlong_group_id`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8 COMMENT ='stream config log report information table';
+
+-- ----------------------------
+-- Table structure for inlong beat heart
+-- ----------------------------
+DROP TABLE IF EXISTS `component_heartbeat`;
+CREATE TABLE `component_heartbeat`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component identifier ip or id',
+ `status_heartbeat` text NOT NULL COMMENT 'component status info',
+ `metric_heartbeat` text NOT NULL COMMENT 'component metric info',
+ `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `index_beat_heart` (`component`, `instance`),
+ KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8 COMMENT ='inlong heartbeat';
+
+-- ----------------------------
+-- Table structure for inlong stream status info
+-- ----------------------------
+DROP TABLE IF EXISTS `group_heartbeat`;
+CREATE TABLE `group_heartbeat`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component ip or id',
+ `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+ `status_heartbeat` text NOT NULL COMMENT 'Inlong group status info',
+ `metric_heartbeat` text NOT NULL COMMENT 'Inlong group metric info',
+ `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`),
+ KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8 COMMENT ='inlong group heartbeat';
+
+-- ----------------------------
+-- Table structure for inlong component static info
+-- ----------------------------
+DROP TABLE IF EXISTS `stream_heartbeat`;
+CREATE TABLE `stream_heartbeat`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component ip or id',
+ `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong metric id',
+ `status_heartbeat` text NOT NULL COMMENT 'status info',
+ `metric_heartbeat` text NOT NULL COMMENT 'static info',
+ `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`),
+ KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8 COMMENT ='inlong stream heartbeat';
+-- ----------------------------
+
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index ac4ad652c..f99f2bc87 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1133,5 +1133,67 @@ CREATE TABLE `stream_config_log`
DEFAULT CHARSET = utf8 COMMENT ='stream config log report information table';
-- ----------------------------
+-- Table structure for inlong beat heart
+-- ----------------------------
+DROP TABLE IF EXISTS `component_heartbeat`;
+CREATE TABLE `component_heartbeat`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component identifier ip or id',
+ `status_heartbeat` text NOT NULL COMMENT 'component status info',
+ `metric_heartbeat` text NOT NULL COMMENT 'component metric info',
+ `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `index_beat_heart` (`component`, `instance`),
+ KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8 COMMENT ='inlong heartbeat';
+
+-- ----------------------------
+-- Table structure for inlong stream status info
+-- ----------------------------
+DROP TABLE IF EXISTS `group_heartbeat`;
+CREATE TABLE `group_heartbeat`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component ip or id',
+ `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+ `status_heartbeat` text NOT NULL COMMENT 'Inlong group status info',
+ `metric_heartbeat` text NOT NULL COMMENT 'Inlong group metric info',
+ `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`),
+ KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8 COMMENT ='inlong group heartbeat';
+
+-- ----------------------------
+-- Table structure for inlong component static info
+-- ----------------------------
+DROP TABLE IF EXISTS `stream_heartbeat`;
+CREATE TABLE `stream_heartbeat`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component ip or id',
+ `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong metric id',
+ `status_heartbeat` text NOT NULL COMMENT 'status info',
+ `metric_heartbeat` text NOT NULL COMMENT 'static info',
+ `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`),
+ KEY `index_report_time` (`report_time`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8 COMMENT ='inlong stream heartbeat';
+-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java
new file mode 100644
index 000000000..4087499c2
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import com.github.pagehelper.PageInfo;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/heartbeat/")
+@Api(tags = "Heartbeat")
+public class HeartbeatInfoController {
+
+ @Autowired
+ private HeartbeatService heartbeatService;
+
+ @RequestMapping(value = "/component/info", method = RequestMethod.POST)
+ @ApiOperation(value = "query component heartbeat")
+ public Response<ComponentHeartbeatResponse> queryComponentHeartbeatInfo(@RequestBody
+ ComponentHeartbeatRequest info) {
+ ComponentHeartbeatResponse response =
+ heartbeatService.getComponentHeartbeatInfo(info.getComponent(),
+ info.getInstance());
+ if (response == null) {
+ return Response.fail("Not found msg!");
+ } else {
+ return Response.success(response);
+ }
+ }
+
+ @RequestMapping(value = "/group/info", method = RequestMethod.POST)
+ @ApiOperation(value = "query group heartbeat")
+ public Response<GroupHeartbeatResponse> queryGroupHeartbeatInfo(@RequestBody
+ GroupHeartbeatRequest info) {
+ GroupHeartbeatResponse response = heartbeatService
+ .getGroupHeartbeatInfo(info.getComponent(), info.getInstance(),
+ info.getInlongGroupId());
+ if (response == null) {
+ return Response.fail("Not found msg!");
+ } else {
+ return Response.success(response);
+ }
+ }
+
+ @RequestMapping(value = "/stream/info", method = RequestMethod.POST)
+ @ApiOperation(value = "query stream heartbeat")
+ public Response<StreamHeartbeatResponse> queryStreamHeartbeat(@RequestBody
+ StreamHeartbeatRequest info) {
+ StreamHeartbeatResponse response = heartbeatService
+ .getStreamHeartbeatInfo(info.getComponent(),
+ info.getInstance(), info.getInlongGroupId(), info.getInlongStreamId());
+ if (response == null) {
+ return Response.fail("Not found msg!");
+ } else {
+ return Response.success(response);
+ }
+ }
+
+ @RequestMapping(value = "/component/list", method = RequestMethod.POST)
+ @ApiOperation(value = "query component heartbeats")
+ public Response<PageInfo<ComponentHeartbeatResponse>> queryComponentHeartbeatInfos(@RequestBody
+ ComponentHeartbeatPageRequest info) {
+ PageInfo<ComponentHeartbeatResponse> responses =
+ heartbeatService.getComponentHeartbeatInfos(info.getComponent(), info.getPageNum(),
+ info.getPageSize());
+ if (responses == null) {
+ return Response.fail("Not found msg!");
+ } else {
+ return Response.success(responses);
+ }
+ }
+
+ @RequestMapping(value = "/group/list", method = RequestMethod.POST)
+ @ApiOperation(value = "query group heartbeats")
+ public Response<PageInfo<GroupHeartbeatResponse>> queryGroupHeartbeatInfos(@RequestBody
+ GroupHeartbeatPageRequest info) {
+ PageInfo<GroupHeartbeatResponse> responses = heartbeatService
+ .getGroupHeartbeatInfos(info.getComponent(), info.getInstance(),
+ info.getPageNum(), info.getPageSize());
+ if (responses == null) {
+ return Response.fail("Not found msg!");
+ } else {
+ return Response.success(responses);
+ }
+ }
+
+ @RequestMapping(value = "/stream/list", method = RequestMethod.POST)
+ @ApiOperation(value = "query stream heartbeats")
+ public Response<PageInfo<StreamHeartbeatResponse>> queryStreamHeartbeats(@RequestBody
+ StreamHeartbeatPageRequest info) {
+ PageInfo<StreamHeartbeatResponse> responses = heartbeatService
+ .getStreamHeartbeatInfos(info.getComponent(), info.getInstance(),
+ info.getInlongGroupId(), info.getPageNum(), info.getPageSize());
+ if (responses == null) {
+ return Response.fail("Not found msg!");
+ } else {
+ return Response.success(responses);
+ }
+ }
+
+}
+
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java
new file mode 100644
index 000000000..ea6cd0e42
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller.openapi;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/openapi/heartbeat")
+@Api(tags = "Heartbeat")
+public class HeartbeatController {
+
+ @Autowired
+ private HeartbeatService heartBeatService;
+
+ @PostMapping(value = "/report",
+ produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
+ @ApiOperation(value = "heartbeat report")
+ public Response<String> reportHeartbeat(@RequestBody HeartbeatReportRequest info) {
+ return Response.success(heartBeatService.reportHeartbeatInfo(info));
+ }
+}
+
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java
new file mode 100644
index 000000000..7ed34b8b0
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller.openapi;
+
+import static org.mockito.BDDMockito.given;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.apache.inlong.manager.service.core.impl.HeartbeatServiceImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+public class HeartbeatControllerTest {
+
+ @InjectMocks
+ private HeartbeatService heartbeatService = new HeartbeatServiceImpl();
+
+ @Mock
+ private ComponentHeartbeatEntityMapper componentHeartbeatEntityMapper;
+
+ @Mock
+ private GroupHeartbeatEntityMapper groupHeartbeatEntityMapper;
+
+ @Mock
+ private StreamHeartbeatEntityMapper streamHeartbeatEntityMapper;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+
+ ComponentHeartbeatEntityWithBLOBs componentHeartbeatEntityWithBLOBs =
+ new ComponentHeartbeatEntityWithBLOBs();
+ componentHeartbeatEntityWithBLOBs.setComponent("Sort");
+ componentHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
+ componentHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"inlongGroupId\":\"groupId\","
+ + "\"componentStaticInfo\":\"\"}]");
+ componentHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"inlongGroupId\":\"groupId\","
+ + "\"streamStatusInfo\":\"\"}]");
+ componentHeartbeatEntityWithBLOBs.setReportTime(new Date());
+ Page<ComponentHeartbeatEntityWithBLOBs> componentPage = new Page<>();
+ componentPage.add(componentHeartbeatEntityWithBLOBs);
+ componentPage.setTotal(1);
+ given(componentHeartbeatEntityMapper.insert(new ComponentHeartbeatEntityWithBLOBs())).willReturn(1);
+ given(componentHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
+ Mockito.anyString())).willReturn(componentHeartbeatEntityWithBLOBs);
+ given(componentHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString()))
+ .willReturn(componentPage);
+
+ GroupHeartbeatEntityWithBLOBs groupHeartbeatEntityWithBLOBs =
+ new GroupHeartbeatEntityWithBLOBs();
+ groupHeartbeatEntityWithBLOBs.setComponent("Sort");
+ groupHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
+ groupHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+ + ": \"10\"},"
+ + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+ groupHeartbeatEntityWithBLOBs.setReportTime(new Date());
+ groupHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+ + ": \"10\"},"
+ + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+ Page<GroupHeartbeatEntityWithBLOBs> groupPage = new Page<>();
+ groupPage.add(groupHeartbeatEntityWithBLOBs);
+ groupPage.setTotal(1);
+ given(groupHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
+ Mockito.anyString(), Mockito.anyString())).willReturn(groupHeartbeatEntityWithBLOBs);
+ given(groupHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString(),
+ Mockito.anyString())).willReturn(groupPage);
+
+ StreamHeartbeatEntityWithBLOBs streamHeartbeatEntityWithBLOBs = new StreamHeartbeatEntityWithBLOBs();
+ streamHeartbeatEntityWithBLOBs.setComponent("Sort");
+ streamHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
+ streamHeartbeatEntityWithBLOBs.setInlongGroupId("group1");
+ streamHeartbeatEntityWithBLOBs.setInlongStreamId("test_test");
+ streamHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"statue\":\"running\"}]");
+ streamHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"outMsg\":\"1\",\"inMsg\":2}]");
+ streamHeartbeatEntityWithBLOBs.setReportTime(new Date());
+
+ Page<StreamHeartbeatEntityWithBLOBs> streamPage = new Page<>();
+ streamPage.add(streamHeartbeatEntityWithBLOBs);
+ streamPage.setTotal(1);
+
+ given(streamHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
+ Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+ .willReturn(streamHeartbeatEntityWithBLOBs);
+ given(streamHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString(),
+ Mockito.anyString(), Mockito.anyString()))
+ .willReturn(streamPage);
+ }
+
+ @Test
+ public void testAddHeartbeat() throws Exception {
+
+ HeartbeatReportRequest request = new HeartbeatReportRequest();
+ request.setComponent("Sort");
+ request.setInstance("127.0.0.1");
+ request.setReportTimestamp(Instant.now().toEpochMilli());
+
+ ComponentHeartbeat componentHeartbeat = new ComponentHeartbeat();
+ componentHeartbeat.setMetricHeartbeat("{\"mem\":\"100\"}");
+ componentHeartbeat.setStatusHeartbeat("{\"runningTime\":\"10h.35m\","
+ + "\"status\":\"10h.35m\","
+ + "\"groupIds\":\"group1,group2\"}");
+
+ List<GroupHeartbeat> groupHeartbeats = new ArrayList<>();
+ GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
+ groupHeartbeat.setInlongGroupId("group1");
+ groupHeartbeat.setStatusHeartbeat("[{\"status\":\"running\",\"streamIds\":\"1,2,3,4\"}]");
+ request.setGroupHeartbeats(groupHeartbeats);
+
+ StreamHeartbeat streamHeartbeat = new StreamHeartbeat();
+ streamHeartbeat.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+ + ": \"10\"},"
+ + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+ streamHeartbeat.setStatusHeartbeat("{}");
+ streamHeartbeat.setInlongGroupId("group1");
+ streamHeartbeat.setInlongStreamId("1");
+ List<StreamHeartbeat> streamHeartbeats = new ArrayList<>();
+ streamHeartbeats.add(streamHeartbeat);
+ request.setStreamHeartbeats(streamHeartbeats);
+
+ Assert.assertEquals("Success", heartbeatService.reportHeartbeatInfo(request));
+ }
+
+ @Test
+ public void testQueryComponentHeartbeat() throws Exception {
+ ComponentHeartbeatResponse response
+ = heartbeatService.getComponentHeartbeatInfo("Sort", "127.0.0.1");
+ Assert.assertEquals("127.0.0.1", response.getInstance());
+ }
+
+ @Test
+ public void testQueryGroupHeartbeat() throws Exception {
+ GroupHeartbeatResponse response
+ = heartbeatService.getGroupHeartbeatInfo("Sort",
+ "127.0.0.1", "group1");
+ Assert.assertEquals("127.0.0.1", response.getInstance());
+ }
+
+ @Test
+ public void testQueryStreamHeartbeat() throws Exception {
+ StreamHeartbeatResponse response =
+ heartbeatService.getStreamHeartbeatInfo("Sort",
+ "127.0.0.1", "group1", "stream1");
+ Assert.assertEquals("127.0.0.1", response.getInstance());
+ }
+
+ @Test
+ public void testQueryComponentHeartbeatPage() throws Exception {
+ PageInfo<ComponentHeartbeatResponse> pageResponse
+ = heartbeatService.getComponentHeartbeatInfos("Sort", 1,
+ 10);
+ Assert.assertEquals(1, pageResponse.getTotal());
+ }
+
+ @Test
+ public void testQueryGroupHeartbeatPage() throws Exception {
+ PageInfo<GroupHeartbeatResponse> pageResponse
+ = heartbeatService.getGroupHeartbeatInfos("Sort",
+ "127.0.0.1", 1, 10);
+ Assert.assertEquals(1, pageResponse.getTotal());
+ }
+
+ @Test
+ public void testQueryStreamHeartbeatPage() throws Exception {
+ PageInfo<StreamHeartbeatResponse> pageResponse =
+ heartbeatService.getStreamHeartbeatInfos("Sort",
+ "127.0.0.1", "group1", 1, 10);
+ Assert.assertEquals(1, pageResponse.getTotal());
+ }
+}
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index 4c3e6b0e1..7ae509f9f 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -127,6 +127,13 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${plugin.surefire.version}</version>
+<!-- <dependencies>-->
+<!-- <dependency>-->
+<!-- <groupId>org.apache.maven.surefire</groupId>-->
+<!-- <artifactId>surefire-junit47</artifactId>-->
+<!-- <version>${plugin.surefire.version}</version>-->
+<!-- </dependency>-->
+<!-- </dependencies>-->
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>