You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/28 11:39:37 UTC
[incubator-inlong] 06/07: [INLONG-4000][Manager] Refactor the implementations of heartbeat interfaces (#4002)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 05820029e108c37ac951e26e81b71bf6668ef07d
Author: healchow <he...@gmail.com>
AuthorDate: Thu Apr 28 15:23:30 2022 +0800
[INLONG-4000][Manager] Refactor the implementations of heartbeat interfaces (#4002)
* [INLONG-4000][Manager] Refactor the implementations of heartbeat interfaces
* [INLONG-4000][Manager] Remove unused imports
---
.../inlong/common/enums/ComponentTypeEnum.java | 10 +-
.../common/pojo/heartbeat/ComponentHeartbeat.java | 15 +-
.../heartbeat/ComponentHeartbeatPageRequest.java | 36 --
.../pojo/heartbeat/ComponentHeartbeatRequest.java | 37 --
.../pojo/heartbeat/ComponentHeartbeatResponse.java | 30 +-
.../common/pojo/heartbeat/GroupHeartbeat.java | 16 +-
.../pojo/heartbeat/GroupHeartbeatPageRequest.java | 38 --
.../pojo/heartbeat/GroupHeartbeatRequest.java | 40 --
.../pojo/heartbeat/GroupHeartbeatResponse.java | 33 +-
...tPageRequest.java => HeartbeatPageRequest.java} | 24 +-
...beatRequest.java => HeartbeatQueryRequest.java} | 20 +-
.../pojo/heartbeat/HeartbeatReportRequest.java | 34 +-
.../common/pojo/heartbeat/StreamHeartbeat.java | 17 +-
.../pojo/heartbeat/StreamHeartbeatResponse.java | 35 +-
.../dao/entity/ComponentHeartbeatEntity.java | 19 +-
.../entity/ComponentHeartbeatEntityWithBLOBs.java | 30 --
.../manager/dao/entity/GroupHeartbeatEntity.java | 18 +-
.../dao/entity/GroupHeartbeatEntityWithBLOBs.java | 31 --
.../manager/dao/entity/StreamHeartbeatEntity.java | 21 +-
.../dao/entity/StreamHeartbeatEntityWithBLOBs.java | 30 --
.../dao/mapper/ComponentHeartbeatEntityMapper.java | 23 +-
.../dao/mapper/GroupHeartbeatEntityMapper.java | 26 +-
.../dao/mapper/StreamHeartbeatEntityMapper.java | 30 +-
.../mappers/ComponentHeartbeatEntityMapper.xml | 130 ++---
.../mappers/GroupHeartbeatEntityMapper.xml | 148 +++---
.../mappers/StreamHeartbeatEntityMapper.xml | 156 +++---
.../manager/service/core/HeartbeatService.java | 84 ++-
.../service/core/impl/HeartbeatServiceImpl.java | 576 +++++++--------------
.../service/core/impl/HeartbeatServiceTest.java | 213 ++++++++
.../main/resources/sql/apache_inlong_manager.sql | 95 ++--
.../manager-web/sql/apache_inlong_manager.sql | 76 ++-
.../web/controller/HeartbeatController.java | 82 +++
.../web/controller/HeartbeatInfoController.java | 133 -----
...ontroller.java => OpenHeartbeatController.java} | 15 +-
.../openapi/HeartbeatControllerTest.java | 210 --------
35 files changed, 1055 insertions(+), 1476 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
index 3361ea99a..9a2330c8f 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
@@ -21,16 +21,16 @@ import lombok.Getter;
public enum ComponentTypeEnum {
- Agent("Inlong-Agent"),
+ Agent("Agent"),
- DataProxy("Inlong-DataProxy"),
+ DataProxy("DataProxy"),
- Cache("Inlong-Cache"),
+ Cache("Cache"),
- Sort("Inlong-Sort");
+ Sort("Sort");
@Getter
- private String name;
+ private final String name;
ComponentTypeEnum(String name) {
this.name = name;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
index 61cc41911..7b9510877 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeat.java
@@ -17,14 +17,21 @@
package org.apache.inlong.manager.common.pojo.heartbeat;
-import lombok.Getter;
-import lombok.Setter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
-@Getter
-@Setter
+/**
+ * Component heartbeat info
+ */
+@Data
+@ApiModel("Component heartbeat info")
public class ComponentHeartbeat {
+ @ApiModelProperty(value = "Status heartbeat info")
private String statusHeartbeat;
+ @ApiModelProperty(value = "Metric heartbeat info")
private String metricHeartbeat;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java
deleted file mode 100644
index 06d95756f..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatPageRequest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.heartbeat;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.inlong.manager.common.beans.PageRequest;
-
-@Getter
-@Setter
-@Data
-@ApiModel("Inlong component heartbeats query request")
-public class ComponentHeartbeatPageRequest
- extends PageRequest {
-
- @ApiModelProperty
- private String component;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java
deleted file mode 100644
index bb2dd27c4..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatRequest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.heartbeat;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-
-@Getter
-@Setter
-@Data
-@ApiModel("Inlong component heartbeat query request")
-public class ComponentHeartbeatRequest {
-
- @ApiModelProperty
- private String component;
-
- @ApiModelProperty
- private String instance;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
index 68e944a7e..d70976512 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/ComponentHeartbeatResponse.java
@@ -17,30 +17,36 @@
package org.apache.inlong.manager.common.pojo.heartbeat;
+import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-@Getter
-@Setter
+import java.util.Date;
+
+/**
+ * Component heartbeat response
+ */
@Data
-@ApiModel("Inlong component heartbeat query response")
+@ApiModel("Component heartbeat response")
public class ComponentHeartbeatResponse {
- @ApiModelProperty
+ @ApiModelProperty(value = "Component name, such as: Agent, Sort...")
private String component;
- @ApiModelProperty
+ @ApiModelProperty(value = "Component instance, can be ip, name...")
private String instance;
- @ApiModelProperty
- private long reportTime;
-
- @ApiModelProperty
+ @ApiModelProperty(value = "Stream status heartbeat")
private String statusHeartbeat;
- @ApiModelProperty
+ @ApiModelProperty(value = "Stream metric heartbeat")
private String metricHeartbeat;
+
+ @ApiModelProperty(value = "Report time of heartbeat")
+ private Long reportTime;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date modifyTime;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
index f2e81f9ed..ab96946ea 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeat.java
@@ -17,16 +17,24 @@
package org.apache.inlong.manager.common.pojo.heartbeat;
-import lombok.Getter;
-import lombok.Setter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
-@Getter
-@Setter
+/**
+ * Inlong group heartbeat info
+ */
+@Data
+@ApiModel("Inlong group heartbeat info")
public class GroupHeartbeat {
+ @ApiModelProperty(value = "Inlong group id")
private String inlongGroupId;
+ @ApiModelProperty(value = "Status heartbeat info")
private String statusHeartbeat;
+ @ApiModelProperty(value = "Metric heartbeat info")
private String metricHeartbeat;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java
deleted file mode 100644
index 23e72f3d0..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatPageRequest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.heartbeat;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.inlong.manager.common.beans.PageRequest;
-
-@Data
-@Getter
-@Setter
-@ApiModel("Inlong group heartbeats request")
-public class GroupHeartbeatPageRequest extends PageRequest {
-
- @ApiModelProperty
- private String component;
-
- @ApiModelProperty
- private String instance;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java
deleted file mode 100644
index 79ec29a78..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatRequest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.heartbeat;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-
-@Data
-@Getter
-@Setter
-@ApiModel("Inlong group heartbeat request")
-public class GroupHeartbeatRequest {
-
- @ApiModelProperty
- private String component;
-
- @ApiModelProperty
- private String instance;
-
- @ApiModelProperty
- private String inlongGroupId;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
index ed3ad688e..c5552ab82 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/GroupHeartbeatResponse.java
@@ -17,33 +17,42 @@
package org.apache.inlong.manager.common.pojo.heartbeat;
+import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
+import java.util.Date;
+
+/**
+ * Inlong group heartbeat response
+ */
@Data
-@Getter
-@Setter
@ApiModel("Inlong group heartbeat response")
public class GroupHeartbeatResponse {
- @ApiModelProperty
+ @ApiModelProperty(value = "Primary key")
+ private Integer id;
+
+ @ApiModelProperty(value = "Component name, such as: Agent, Sort...")
private String component;
- @ApiModelProperty
+ @ApiModelProperty(value = "Component instance, can be ip, name...")
private String instance;
- @ApiModelProperty
- private long reportTime;
-
- @ApiModelProperty
+ @ApiModelProperty(value = "Inlong group id")
private String inlongGroupId;
- @ApiModelProperty
+ @ApiModelProperty(value = "Stream status heartbeat")
private String statusHeartbeat;
- @ApiModelProperty
+ @ApiModelProperty(value = "Stream metric heartbeat")
private String metricHeartbeat;
+
+ @ApiModelProperty(value = "Report time of heartbeat")
+ private Long reportTime;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date modifyTime;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatPageRequest.java
similarity index 69%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatPageRequest.java
index 08ead5382..ad7f4e865 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatPageRequest.java
@@ -20,23 +20,27 @@ package org.apache.inlong.manager.common.pojo.heartbeat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
+import lombok.EqualsAndHashCode;
import org.apache.inlong.manager.common.beans.PageRequest;
-@Getter
-@Setter
+/**
+ * Base heartbeat page request
+ */
@Data
-@ApiModel("Inlong stream heartbeats query request")
-public class StreamHeartbeatPageRequest
- extends PageRequest {
+@EqualsAndHashCode(callSuper = false)
+@ApiModel("Base heartbeat page request")
+public class HeartbeatPageRequest extends PageRequest {
- @ApiModelProperty
+ @ApiModelProperty(value = "Component name, such as: Agent, Sort...")
private String component;
- @ApiModelProperty
+ @ApiModelProperty(value = "Component instance, can be ip, name...")
private String instance;
- @ApiModelProperty
+ @ApiModelProperty(value = "Inlong group id")
private String inlongGroupId;
+
+ @ApiModelProperty(value = "Inlong stream id")
+ private String inlongStreamId;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatQueryRequest.java
similarity index 75%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatQueryRequest.java
index c5420a0b1..16afdeae5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatQueryRequest.java
@@ -20,24 +20,24 @@ package org.apache.inlong.manager.common.pojo.heartbeat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-@Getter
-@Setter
+/**
+ * Heartbeat query request
+ */
@Data
-@ApiModel("Inlong stream heartbeat query request")
-public class StreamHeartbeatRequest {
+@ApiModel("Heartbeat query request")
+public class HeartbeatQueryRequest {
- @ApiModelProperty
+ @ApiModelProperty(value = "Component name, such as: Agent, Sort...")
private String component;
- @ApiModelProperty
+ @ApiModelProperty(value = "Component instance, can be ip, name...")
private String instance;
- @ApiModelProperty
+ @ApiModelProperty(value = "Inlong group id")
private String inlongGroupId;
- @ApiModelProperty
+ @ApiModelProperty(value = "Inlong stream id")
private String inlongStreamId;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
index 7f9bed089..cdd262439 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/HeartbeatReportRequest.java
@@ -17,32 +17,38 @@
package org.apache.inlong.manager.common.pojo.heartbeat;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import javax.validation.constraints.NotBlank;
import java.util.List;
-import lombok.Getter;
-import lombok.Setter;
-@Getter
-@Setter
+/**
+ * Request of heartbeat report
+ */
+@Data
+@ApiModel("Request of heartbeat report")
public class HeartbeatReportRequest {
+ @NotBlank
+ @ApiModelProperty(value = "Component name, such as: Agent, Sort...", required = true)
private String component;
+ @NotBlank
+ @ApiModelProperty(value = "Component instance, can be ip, name...", required = true)
private String instance;
- private long reportTimestamp;
+ @ApiModelProperty(value = "Report timestamp", required = true)
+ private Long reportTime;
- /*
- * component
- */
+ @ApiModelProperty(value = "Component heartbeat info")
private ComponentHeartbeat componentHeartbeat;
- /*
- * group
- */
+ @ApiModelProperty(value = "Group heartbeat list")
private List<GroupHeartbeat> groupHeartbeats;
- /*
- * stream
- */
+ @ApiModelProperty(value = "Stream heartbeat list")
private List<StreamHeartbeat> streamHeartbeats;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
index 391c62221..3f0b616f3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeat.java
@@ -17,18 +17,27 @@
package org.apache.inlong.manager.common.pojo.heartbeat;
-import lombok.Getter;
-import lombok.Setter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
-@Getter
-@Setter
+/**
+ * Inlong stream heartbeat info
+ */
+@Data
+@ApiModel("Inlong stream heartbeat info")
public class StreamHeartbeat {
+ @ApiModelProperty(value = "Inlong group id")
private String inlongGroupId;
+ @ApiModelProperty(value = "Inlong stream id")
private String inlongStreamId;
+ @ApiModelProperty(value = "Status heartbeat info")
private String statusHeartbeat;
+ @ApiModelProperty(value = "Metric heartbeat info")
private String metricHeartbeat;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
index 4b808f090..4f2cbb31c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/heartbeat/StreamHeartbeatResponse.java
@@ -17,36 +17,45 @@
package org.apache.inlong.manager.common.pojo.heartbeat;
+import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-@Getter
-@Setter
+import java.util.Date;
+
+/**
+ * Inlong stream heartbeat response
+ */
@Data
@ApiModel("Inlong stream heartbeat response")
public class StreamHeartbeatResponse {
- @ApiModelProperty
+ @ApiModelProperty(value = "Primary key")
+ private Integer id;
+
+ @ApiModelProperty(value = "Component name, such as: Agent, Sort...")
private String component;
- @ApiModelProperty
+ @ApiModelProperty(value = "Component instance, can be ip, name...")
private String instance;
- @ApiModelProperty
- private long reportTime;
-
- @ApiModelProperty
+ @ApiModelProperty(value = "Inlong group id")
private String inlongGroupId;
- @ApiModelProperty
+ @ApiModelProperty(value = "Inlong stream id")
private String inlongStreamId;
- @ApiModelProperty
+ @ApiModelProperty(value = "Stream status heartbeat")
private String statusHeartbeat;
- @ApiModelProperty
+ @ApiModelProperty(value = "Stream metric heartbeat")
private String metricHeartbeat;
+
+ @ApiModelProperty(value = "Report time of heartbeat")
+ private Long reportTime;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date modifyTime;
+
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
index b2d8ccca5..b5406d1ec 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntity.java
@@ -17,23 +17,22 @@
package org.apache.inlong.manager.dao.entity;
+import lombok.Data;
+
import java.io.Serializable;
import java.util.Date;
-import lombok.Data;
@Data
public class ComponentHeartbeatEntity implements Serializable {
- private Integer id;
+ private static final long serialVersionUID = 1L;
+ private Integer id;
private String component;
-
private String instance;
-
- private Date reportTime;
-
- private Date modifyTime;
-
+ private String statusHeartbeat;
+ private String metricHeartbeat;
+ private Long reportTime;
private Date createTime;
+ private Date modifyTime;
- private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java
deleted file mode 100644
index a9eaa8691..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ComponentHeartbeatEntityWithBLOBs.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import java.io.Serializable;
-import lombok.Data;
-
-@Data
-public class ComponentHeartbeatEntityWithBLOBs extends ComponentHeartbeatEntity implements Serializable {
- private String statusHeartbeat;
-
- private String metricHeartbeat;
-
- private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
index 230202497..c054cef03 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntity.java
@@ -17,25 +17,23 @@
package org.apache.inlong.manager.dao.entity;
+import lombok.Data;
+
import java.io.Serializable;
import java.util.Date;
-import lombok.Data;
@Data
public class GroupHeartbeatEntity implements Serializable {
- private Integer id;
+ private static final long serialVersionUID = 1L;
+ private Integer id;
private String component;
-
private String instance;
-
private String inlongGroupId;
-
- private Date reportTime;
-
- private Date modifyTime;
-
+ private String statusHeartbeat;
+ private String metricHeartbeat;
+ private Long reportTime;
private Date createTime;
+ private Date modifyTime;
- private static final long serialVersionUID = 1L;
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java
deleted file mode 100644
index f747cde3f..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/GroupHeartbeatEntityWithBLOBs.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import java.io.Serializable;
-import lombok.Data;
-
-@Data
-public class GroupHeartbeatEntityWithBLOBs extends GroupHeartbeatEntity implements Serializable {
-
- private String statusHeartbeat;
-
- private String metricHeartbeat;
-
- private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
index 8ee2255c9..6e74b885e 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntity.java
@@ -17,27 +17,24 @@
package org.apache.inlong.manager.dao.entity;
+import lombok.Data;
+
import java.io.Serializable;
import java.util.Date;
-import lombok.Data;
@Data
public class StreamHeartbeatEntity implements Serializable {
- private Integer id;
+ private static final long serialVersionUID = 1L;
+ private Integer id;
private String component;
-
private String instance;
-
private String inlongGroupId;
-
private String inlongStreamId;
-
- private Date reportTime;
-
- private Date modifyTime;
-
+ private String statusHeartbeat;
+ private String metricHeartbeat;
+ private Long reportTime;
private Date createTime;
+ private Date modifyTime;
- private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
+}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java
deleted file mode 100644
index 83ceb1633..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamHeartbeatEntityWithBLOBs.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import java.io.Serializable;
-import lombok.Data;
-
-@Data
-public class StreamHeartbeatEntityWithBLOBs extends StreamHeartbeatEntity implements Serializable {
- private String statusHeartbeat;
-
- private String metricHeartbeat;
-
- private static final long serialVersionUID = 1L;
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
index 2e653dc7e..8ebf1bceb 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ComponentHeartbeatEntityMapper.java
@@ -17,21 +17,26 @@
package org.apache.inlong.manager.dao.mapper;
-import java.util.List;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+@Repository
public interface ComponentHeartbeatEntityMapper {
- int deleteByPrimaryKey(Integer id);
- int insert(ComponentHeartbeatEntityWithBLOBs record);
+ int insert(ComponentHeartbeatEntity record);
- ComponentHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+ int insertOrUpdateByKey(ComponentHeartbeatEntity record);
- ComponentHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
- @Param("instance") String instance);
+ ComponentHeartbeatEntity selectByPrimaryKey(Integer id);
- List<ComponentHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component);
+ ComponentHeartbeatEntity selectByKey(@Param("component") String component, @Param("instance") String instance);
+
+ List<ComponentHeartbeatEntity> selectByCondition(@Param("request") HeartbeatPageRequest request);
+
+ int deleteByPrimaryKey(Integer id);
- int updateByKeyWithBLOBs(ComponentHeartbeatEntityWithBLOBs record);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
index e37062e4e..cbe0a9ce9 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/GroupHeartbeatEntityMapper.java
@@ -17,23 +17,29 @@
package org.apache.inlong.manager.dao.mapper;
-import java.util.List;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+@Repository
public interface GroupHeartbeatEntityMapper {
- int deleteByPrimaryKey(Integer id);
- int insert(GroupHeartbeatEntityWithBLOBs record);
+ int insert(GroupHeartbeatEntity record);
- GroupHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+ int insertOrUpdateAll(@Param("component") String component, @Param("instance") String instance,
+ @Param("reportTime") Long reportTime, @Param("list") List<GroupHeartbeat> list);
- GroupHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
- @Param("instance") String instance,
+ GroupHeartbeatEntity selectByPrimaryKey(Integer id);
+
+ GroupHeartbeatEntity selectByKey(@Param("component") String component, @Param("instance") String instance,
@Param("inlongGroupId") String inlongGroupId);
- List<GroupHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component,
- @Param("instance") String instance);
+ List<GroupHeartbeatEntity> selectByCondition(@Param("request") HeartbeatPageRequest request);
+
+ int deleteByPrimaryKey(Integer id);
- int updateByKeyWithBLOBs(GroupHeartbeatEntityWithBLOBs record);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java
index 2ab8293e5..1af26e3ed 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamHeartbeatEntityMapper.java
@@ -17,25 +17,29 @@
package org.apache.inlong.manager.dao.mapper;
-import java.util.List;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+@Repository
public interface StreamHeartbeatEntityMapper {
- int deleteByPrimaryKey(Integer id);
- int insert(StreamHeartbeatEntityWithBLOBs record);
+ int insert(StreamHeartbeatEntity record);
- StreamHeartbeatEntityWithBLOBs selectByPrimaryKey(Integer id);
+ int insertOrUpdateAll(@Param("component") String component, @Param("instance") String instance,
+ @Param("reportTime") Long reportTime, @Param("list") List<StreamHeartbeat> list);
- StreamHeartbeatEntityWithBLOBs selectByKey(@Param("component") String component,
- @Param("instance") String instance,
- @Param("inlongGroupId") String inlongGroupId,
- @Param("inlongStreamId") String inlongStreamId);
+ StreamHeartbeatEntity selectByPrimaryKey(Integer id);
- List<StreamHeartbeatEntityWithBLOBs> selectHeartbeats(@Param("component") String component,
- @Param("instance") String instance,
- @Param("inlongGroupId") String inlongGroupId);
+ StreamHeartbeatEntity selectByKey(@Param("component") String component, @Param("instance") String instance,
+ @Param("inlongGroupId") String inlongGroupId, @Param("inlongStreamId") String inlongStreamId);
+
+ List<StreamHeartbeatEntity> selectByCondition(@Param("request") HeartbeatPageRequest request);
+
+ int deleteByPrimaryKey(Integer id);
- int updateByKeyWithBLOBs(StreamHeartbeatEntityWithBLOBs record);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
index 508210e3d..abae727b1 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ComponentHeartbeatEntityMapper.xml
@@ -1,83 +1,85 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
+ or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
+ regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
+ with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
+ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper">
- <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
- <id column="id" jdbcType="INTEGER" property="id" />
- <result column="component" jdbcType="VARCHAR" property="component" />
- <result column="instance" jdbcType="VARCHAR" property="instance" />
- <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
- <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
- <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
- </resultMap>
- <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
- <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
- <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
- </resultMap>
- <sql id="Base_Column_List">
- id, component, instance, report_time, modify_time, create_time
- </sql>
- <sql id="Blob_Column_List">
- status_heartbeat, metric_heartbeat
- </sql>
- <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from component_heartbeat
- where id = #{id,jdbcType=INTEGER}
- </select>
- <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from component_heartbeat
- where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
- </select>
- <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from component_heartbeat
- where component = #{component,jdbcType=VARCHAR}
- </select>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
- delete from component_heartbeat
- where id = #{id,jdbcType=INTEGER}
- </delete>
- <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
- insert into component_heartbeat (component, instance,
- report_time, status_heartbeat, metric_heartbeat)
- values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
- #{reportTime,jdbcType=TIMESTAMP}, #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR})
- </insert>
- <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs">
- update component_heartbeat
- set
- report_time = #{reportTime,jdbcType=TIMESTAMP},
- status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
- metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
- where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
- </update>
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
+ <id column="id" jdbcType="INTEGER" property="id"/>
+ <result column="component" jdbcType="VARCHAR" property="component"/>
+ <result column="instance" jdbcType="VARCHAR" property="instance"/>
+ <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat"/>
+ <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat"/>
+ <result column="report_time" jdbcType="BIGINT" property="reportTime"/>
+ <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, component, instance, status_heartbeat, metric_heartbeat, report_time, create_time, modify_time
+ </sql>
+
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
+ insert into component_heartbeat (component, instance,
+ status_heartbeat, metric_heartbeat,
+ report_time)
+ values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+ #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR},
+ #{reportTime,jdbcType=BIGINT})
+ </insert>
+ <insert id="insertOrUpdateByKey" parameterType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
+ insert into component_heartbeat (component, instance,
+ status_heartbeat, metric_heartbeat,
+ report_time)
+ values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+ #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR},
+ #{reportTime,jdbcType=BIGINT})
+ ON DUPLICATE KEY UPDATE status_heartbeat = values(status_heartbeat),
+ metric_heartbeat = values(metric_heartbeat),
+ report_time = values(report_time)
+ </insert>
+
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from component_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByKey" parameterType="java.lang.String" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from component_heartbeat
+ where component = #{component,jdbcType=VARCHAR}
+ and instance = #{instance,jdbcType=VARCHAR}
+ </select>
+ <select id="selectByCondition"
+ parameterType="org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest"
+ resultType="org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from component_heartbeat
+ where component = #{request.component, jdbcType=VARCHAR}
+ order by modify_time desc
+ </select>
+
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete
+ from component_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml
index bbba5e324..fa258eb47 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/GroupHeartbeatEntityMapper.xml
@@ -1,86 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
+ or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
+ regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
+ with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
+ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper">
- <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity">
- <id column="id" jdbcType="INTEGER" property="id" />
- <result column="component" jdbcType="VARCHAR" property="component" />
- <result column="instance" jdbcType="VARCHAR" property="instance" />
- <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
- <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
- <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
- <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
- </resultMap>
- <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
- <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
- <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
- </resultMap>
- <sql id="Base_Column_List">
- id, component, instance, inlong_group_id, report_time, modify_time, create_time
- </sql>
- <sql id="Blob_Column_List">
- status_heartbeat, metric_heartbeat
- </sql>
- <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from group_heartbeat
- where id = #{id,jdbcType=INTEGER}
- </select>
- <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from group_heartbeat
- where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
- </select>
- <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from group_heartbeat
- where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR}
- </select>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
- delete from group_heartbeat
- where id = #{id,jdbcType=INTEGER}
- </delete>
- <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
- insert into group_heartbeat (component, instance,
- inlong_group_id, report_time, status_heartbeat, metric_heartbeat
- )
- values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
- #{inlongGroupId,jdbcType=VARCHAR}, #{reportTime,jdbcType=TIMESTAMP}, #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR}
- )
- </insert>
- <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs">
- update group_heartbeat
- set
- report_time = #{reportTime,jdbcType=TIMESTAMP},
- status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
- metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
- where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
- </update>
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity">
+ <id column="id" jdbcType="INTEGER" property="id"/>
+ <result column="component" jdbcType="VARCHAR" property="component"/>
+ <result column="instance" jdbcType="VARCHAR" property="instance"/>
+ <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+ <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat"/>
+ <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat"/>
+ <result column="report_time" jdbcType="BIGINT" property="reportTime"/>
+ <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, component, instance, inlong_group_id, status_heartbeat, metric_heartbeat,
+ report_time, create_time, modify_time
+ </sql>
+
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity">
+ insert into group_heartbeat (component, instance,
+ inlong_group_id, status_heartbeat,
+ metric_heartbeat, report_time)
+ values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+ #{inlongGroupId,jdbcType=VARCHAR}, #{statusHeartbeat,jdbcType=LONGVARCHAR},
+ #{metricHeartbeat,jdbcType=LONGVARCHAR}, #{reportTime,jdbcType=BIGINT})
+ </insert>
+ <insert id="insertOrUpdateAll" parameterType="java.util.List">
+ insert into group_heartbeat (component, instance,
+ inlong_group_id, status_heartbeat,
+ metric_heartbeat, report_time)
+ values
+ <foreach collection="list" index="index" item="item" open="" close="" separator=",">
+ (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+ #{item.inlongGroupId,jdbcType=VARCHAR}, #{item.statusHeartbeat,jdbcType=LONGVARCHAR},
+ #{item.metricHeartbeat,jdbcType=LONGVARCHAR}, #{reportTime,jdbcType=BIGINT})
+ </foreach>
+ ON DUPLICATE KEY UPDATE
+ status_heartbeat = values(status_heartbeat),
+ metric_heartbeat = values(metric_heartbeat),
+ report_time = values(report_time)
+ </insert>
+
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from group_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByKey" parameterType="java.lang.String" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from group_heartbeat
+ where component = #{component,jdbcType=VARCHAR}
+ and instance = #{instance,jdbcType=VARCHAR}
+ and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+ </select>
+ <select id="selectByCondition"
+ parameterType="org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest"
+ resultType="org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from group_heartbeat
+ <where>
+ component = #{request.component, jdbcType=VARCHAR}
+ <if test="request.inlongGroupId != null and request.inlongGroupId != ''">
+ and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
+ </if>
+ <if test="request.instance != null and request.instance != ''">
+ and instance = #{request.instance, jdbcType=VARCHAR}
+ </if>
+ order by modify_time desc
+ </where>
+ </select>
+
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete
+ from group_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml
index c3762875e..a22bfdcaf 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamHeartbeatEntityMapper.xml
@@ -1,87 +1,107 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
+ or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
+ regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
+ with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
+ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper">
- <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
- <id column="id" jdbcType="INTEGER" property="id" />
- <result column="component" jdbcType="VARCHAR" property="component" />
- <result column="instance" jdbcType="VARCHAR" property="instance" />
- <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
- <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
- <result column="report_time" jdbcType="TIMESTAMP" property="reportTime" />
- <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
- <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
- </resultMap>
- <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
- <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat" />
- <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat" />
- </resultMap>
- <sql id="Base_Column_List">
- id, component, instance, inlong_group_id, inlong_stream_id, report_time, modify_time,
- create_time
- </sql>
- <sql id="Blob_Column_List">
- status_heartbeat, metric_heartbeat
- </sql>
- <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from stream_heartbeat
- where id = #{id,jdbcType=INTEGER}
- </select>
- <select id="selectByKey" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from stream_heartbeat
- where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR} and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
- </select>
- <select id="selectHeartbeats" parameterType="java.lang.String" resultMap="ResultMapWithBLOBs">
- select
- <include refid="Base_Column_List" />
- ,
- <include refid="Blob_Column_List" />
- from stream_heartbeat
- where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
- </select>
- <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
- delete from stream_heartbeat
- where id = #{id,jdbcType=INTEGER}
- </delete>
- <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
- insert into stream_heartbeat (component, instance,
- inlong_group_id, inlong_stream_id, report_time, status_heartbeat, metric_heartbeat)
- values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
- #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, #{reportTime,jdbcType=TIMESTAMP},
- #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR})
- </insert>
- <update id="updateByKeyWithBLOBs" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs">
- update stream_heartbeat
- set
- report_time = #{reportTime,jdbcType=TIMESTAMP},
- status_heartbeat = #{statusHeartbeat,jdbcType=LONGVARCHAR},
- metric_heartbeat = #{metricHeartbeat,jdbcType=LONGVARCHAR}
- where component = #{component,jdbcType=VARCHAR} and instance = #{instance,jdbcType=VARCHAR} and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR} and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
- </update>
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
+ <id column="id" jdbcType="INTEGER" property="id"/>
+ <result column="component" jdbcType="VARCHAR" property="component"/>
+ <result column="instance" jdbcType="VARCHAR" property="instance"/>
+ <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+ <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
+ <result column="status_heartbeat" jdbcType="LONGVARCHAR" property="statusHeartbeat"/>
+ <result column="metric_heartbeat" jdbcType="LONGVARCHAR" property="metricHeartbeat"/>
+ <result column="report_time" jdbcType="BIGINT" property="reportTime"/>
+ <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, component, instance, inlong_group_id, inlong_stream_id, status_heartbeat, metric_heartbeat,
+ report_time, create_time, modify_time
+ </sql>
+
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
+ insert into stream_heartbeat (component, instance,
+ inlong_group_id, inlong_stream_id,
+ status_heartbeat, metric_heartbeat,
+ report_time, create_time)
+ values (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+ #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+ #{statusHeartbeat,jdbcType=LONGVARCHAR}, #{metricHeartbeat,jdbcType=LONGVARCHAR},
+ #{reportTime,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP})
+ </insert>
+ <insert id="insertOrUpdateAll" parameterType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
+ insert into stream_heartbeat (component, instance,
+ inlong_group_id, inlong_stream_id,
+ status_heartbeat, metric_heartbeat,
+ report_time)
+ values
+ <foreach collection="list" index="index" item="item" open="" close="" separator=",">
+ (#{component,jdbcType=VARCHAR}, #{instance,jdbcType=VARCHAR},
+ #{item.inlongGroupId,jdbcType=VARCHAR}, #{item.inlongStreamId,jdbcType=VARCHAR},
+ #{item.statusHeartbeat,jdbcType=LONGVARCHAR}, #{item.metricHeartbeat,jdbcType=LONGVARCHAR},
+ #{reportTime,jdbcType=BIGINT})
+ </foreach>
+ ON DUPLICATE KEY UPDATE
+ status_heartbeat = values(status_heartbeat),
+ metric_heartbeat = values(metric_heartbeat),
+ report_time = values(report_time)
+ </insert>
+
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByKey" parameterType="java.lang.String" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_heartbeat
+ where component = #{component,jdbcType=VARCHAR}
+ and instance = #{instance,jdbcType=VARCHAR}
+ and inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR}
+ and inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR}
+ </select>
+ <select id="selectByCondition"
+ parameterType="org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest"
+ resultType="org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_heartbeat
+ <where>
+ component = #{request.component, jdbcType=VARCHAR}
+ and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
+ <if test="request.instance != null and request.instance != ''">
+ and instance = #{request.instance, jdbcType=VARCHAR}
+ </if>
+ <if test="request.inlongStreamId != null and request.inlongStreamId != ''">
+ and inlong_stream_id = #{request.inlongStreamId, jdbcType=VARCHAR}
+ </if>
+ order by modify_time desc
+ </where>
+ </select>
+
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete
+ from stream_heartbeat
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java
index ea8acf4aa..4dfe64752 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/HeartbeatService.java
@@ -20,7 +20,9 @@ package org.apache.inlong.manager.service.core;
import com.github.pagehelper.PageInfo;
import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatQueryRequest;
import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
/**
@@ -29,73 +31,59 @@ import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
public interface HeartbeatService {
/**
- * report Heartbeat
+ * Report heartbeat for inlong component, such as Agent, Sort, etc.
+ *
* @param request request
- * @return
+ * @return saved success or not
*/
- String reportHeartbeatInfo(HeartbeatReportRequest request);
+ Boolean reportHeartbeat(HeartbeatReportRequest request);
/**
- * get Component HeartbeatInfo
- * @param component component
- * @param instance instance
- * @return ComponentHeartBeatResponse
+ * Get component heartbeat
+ *
+ * @param request query request of heartbeat
+ * @return component heartbeat
*/
- ComponentHeartbeatResponse getComponentHeartbeatInfo(String component,
- String instance);
+ ComponentHeartbeatResponse getComponentHeartbeat(HeartbeatQueryRequest request);
/**
- * get Group HeartbeatInfo
- * @param component component
- * @param instance instance
- * @param inlongGroupId inlongGroupId
- * @return GroupHeartbeatResponse
+ * Get inlong group heartbeat
+ *
+ * @param request query request of heartbeat
+ * @return group heartbeat
*/
- GroupHeartbeatResponse getGroupHeartbeatInfo(String component,
- String instance, String inlongGroupId);
+ GroupHeartbeatResponse getGroupHeartbeat(HeartbeatQueryRequest request);
/**
- * get Stream HeartbeatInfo
- * @param component component
- * @param instance instance
- * @param inlongGroupId inlongGroupId
- * @param inlongStreamId inlongStreamId
- * @return StreamHeartBeatResponse
+ * Get inlong stream heartbeat
+ *
+ * @param request query request of heartbeat
+ * @return stream heartbeat
*/
- StreamHeartbeatResponse getStreamHeartbeatInfo(String component,
- String instance, String inlongGroupId, String inlongStreamId);
+ StreamHeartbeatResponse getStreamHeartbeat(HeartbeatQueryRequest request);
/**
- * get Component HeartbeatInfos by page
- * @param component component
- * @param pageNum pageNum
- * @param pageSize pageSize
- * @return one page of ComponentHeartBeatResponse
+ * List component heartbeat by page
+ *
+ * @param request paging query request
+ * @return list of component heartbeat
*/
- PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfos(String component, int pageNum,
- int pageSize);
+ PageInfo<ComponentHeartbeatResponse> listComponentHeartbeat(HeartbeatPageRequest request);
/**
- * get Group HeartbeatInfos by page
- * @param component component
- * @param instance instance
- * @param pageNum pageNum
- * @param pageSize pageSize
- * @return one page of GroupHeartbeatResponse
+ * List group heartbeat by page
+ *
+ * @param request paging query request
+ * @return list of group heartbeat
*/
- PageInfo<GroupHeartbeatResponse> getGroupHeartbeatInfos(String component,
- String instance, int pageNum, int pageSize);
+ PageInfo<GroupHeartbeatResponse> listGroupHeartbeat(HeartbeatPageRequest request);
/**
- * get Stream HeartbeatInfos
- * @param component component
- * @param instance instance
- * @param inlongGroupId inlongGroupId
- * @param pageNum pageNum
- * @param pageSize pageSize
- * @return one page of GroupHeartbeatResponse
+ * List stream heartbeat by page
+ *
+ * @param request paging query request
+ * @return list of stream heartbeat
*/
- PageInfo<StreamHeartbeatResponse> getStreamHeartbeatInfos(String component,
- String instance, String inlongGroupId, int pageNum, int pageSize);
+ PageInfo<StreamHeartbeatResponse> listStreamHeartbeat(HeartbeatPageRequest request);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
index 3b46c771f..07b745615 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
@@ -21,23 +21,25 @@ import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.gson.Gson;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeat;
import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatQueryRequest;
import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
-import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
-import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity;
import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
@@ -47,454 +49,238 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.List;
+
/**
- * report or query heartbeat info
+ * Heartbeat service layer implementation
*/
@Service
-public class HeartbeatServiceImpl
- implements HeartbeatService {
+public class HeartbeatServiceImpl implements HeartbeatService {
private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatServiceImpl.class);
+ private static final Gson GSON = new Gson();
@Autowired
- private ComponentHeartbeatEntityMapper componentHeartbeatEntityMapper;
-
+ private ComponentHeartbeatEntityMapper componentHeartbeatMapper;
@Autowired
- private GroupHeartbeatEntityMapper groupHeartbeatEntityMapper;
-
+ private GroupHeartbeatEntityMapper groupHeartbeatMapper;
@Autowired
- private StreamHeartbeatEntityMapper streamHeartbeatEntityMapper;
+ private StreamHeartbeatEntityMapper streamHeartbeatMapper;
- private static Gson gson = new Gson();
-
- /**
- * heartbeat common handler
- * @param request request
- * @return
- */
@Override
- public String reportHeartbeatInfo(HeartbeatReportRequest request) {
- if (request != null && StringUtils.isNotEmpty(request.getComponent())) {
- ComponentTypeEnum componentType =
- ComponentTypeEnum.valueOf(request.getComponent());
- switch (componentType) {
- case Sort:
- case DataProxy:
- case Agent:
- case Cache:
- default:
- updateByDefaultWay(request);
- }
- } else {
- LOGGER.warn("request is null or component [{}] is not supported",
- request.getComponent());
+ public Boolean reportHeartbeat(HeartbeatReportRequest request) {
+ if (request == null || StringUtils.isBlank(request.getComponent())) {
+ LOGGER.warn("request is null or component null, just return");
+ return false;
}
- return "Success";
- }
-
- /**
- * update Heartbeat Data
- * @param request request
- */
- private void updateComponentHeartbeatData(HeartbeatReportRequest request) {
- if (request == null || request.getComponentHeartbeat() == null) {
- return;
- }
- ComponentHeartbeatEntityWithBLOBs entity = new ComponentHeartbeatEntityWithBLOBs();
- entity.setComponent(request.getComponent());
- entity.setInstance(request.getInstance());
- entity.setReportTime(new Date(request.getReportTimestamp()));
- if (StringUtils.isNotEmpty(request.getComponentHeartbeat().getStatusHeartbeat())) {
- entity.setStatusHeartbeat(request.getComponentHeartbeat().getStatusHeartbeat());
- }
- if (StringUtils.isNotEmpty(request.getComponentHeartbeat().getMetricHeartbeat())) {
- entity.setMetricHeartbeat(request.getComponentHeartbeat().getMetricHeartbeat());
- }
- int count = componentHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
- if (count == 0) {
- componentHeartbeatEntityMapper.insert(entity);
- }
- }
-
- /**
- * update Component StaticData
- * @param request request
- */
- private void updateGroupHeartbeatData(HeartbeatReportRequest request) {
- List<GroupHeartbeat> list = request.getGroupHeartbeats();
- if (list != null) {
- for (GroupHeartbeat info : list) {
- GroupHeartbeatEntityWithBLOBs entity = new GroupHeartbeatEntityWithBLOBs();
- entity.setComponent(request.getComponent());
- entity.setInstance(request.getInstance());
- entity.setReportTime(new Date(request.getReportTimestamp()));
- entity.setInlongGroupId(info.getInlongGroupId());
- entity.setMetricHeartbeat(info.getMetricHeartbeat());
- entity.setStatusHeartbeat(info.getStatusHeartbeat());
- int count = groupHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
- if (count == 0) {
- groupHeartbeatEntityMapper.insert(entity);
- }
- }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("received heartbeat: " + request);
}
- }
- /**
- * update Stream Status Data
- * @param request request
- */
- private void updateStreamHeartBeatData(HeartbeatReportRequest request) {
- List<StreamHeartbeat> list = request.getStreamHeartbeats();
- if (list != null) {
- for (StreamHeartbeat info : list) {
- StreamHeartbeatEntityWithBLOBs entity = new StreamHeartbeatEntityWithBLOBs();
- entity.setComponent(request.getComponent());
- entity.setInstance(request.getInstance());
- entity.setReportTime(new Date(request.getReportTimestamp()));
- entity.setInlongGroupId(info.getInlongGroupId());
- entity.setInlongStreamId(info.getInlongStreamId());
- entity.setMetricHeartbeat(info.getMetricHeartbeat());
- entity.setStatusHeartbeat(info.getStatusHeartbeat());
- int count = streamHeartbeatEntityMapper.updateByKeyWithBLOBs(entity);
- if (count == 0) {
- streamHeartbeatEntityMapper.insert(entity);
- }
- }
+ ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(request.getComponent());
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ return updateHeartbeatOpt(request);
+ default:
+ throw new BusinessException("Unsupported component type for " + request.getComponent());
}
}
- /**
- * get heartbeat info
- * @param component componentTypeName
- * @param instance instanceIdentifier
- * @return heartbeat info
- */
@Override
- public ComponentHeartbeatResponse getComponentHeartbeatInfo(String component,
- String instance) {
- if (component != null && StringUtils.isNotEmpty(component)) {
- ComponentTypeEnum componentType =
- ComponentTypeEnum.valueOf(component);
- switch (componentType) {
- case Sort:
- case DataProxy:
- case Agent:
- case Cache:
- default:
- return getComponentHeartbeatInfoByDefaultWay(component, instance);
- }
- } else {
- LOGGER.warn("request is null or component type is null");
+ public ComponentHeartbeatResponse getComponentHeartbeat(HeartbeatQueryRequest request) {
+ Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+ String component = request.getComponent();
+ Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+ Preconditions.checkNotEmpty(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
+
+ ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ ComponentHeartbeatEntity res = componentHeartbeatMapper.selectByKey(component, request.getInstance());
+ return CommonBeanUtils.copyProperties(res, ComponentHeartbeatResponse::new);
+ default:
+ throw new BusinessException("Unsupported component type for " + component);
}
- return null;
}
- /**
- * get heartbeat static info
- * @param component componentTypeName
- * @param instance instanceIdentifier
- * @param inlongGroupId inlongGroupId
- * @return heartbeatStaticInfoResponse
- */
@Override
- public GroupHeartbeatResponse getGroupHeartbeatInfo(String component,
- String instance, String inlongGroupId) {
- if (component != null && StringUtils.isNotEmpty(component)) {
- ComponentTypeEnum componentType =
- ComponentTypeEnum.valueOf(component);
- switch (componentType) {
- case Sort:
- case DataProxy:
- case Agent:
- case Cache:
- default:
- return getGroupHeartbeatByDefaultWay(component, instance, inlongGroupId);
- }
- } else {
- LOGGER.warn("request is null or component type is null");
+ public GroupHeartbeatResponse getGroupHeartbeat(HeartbeatQueryRequest request) {
+ Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+ String component = request.getComponent();
+ Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+ Preconditions.checkNotEmpty(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
+ Preconditions.checkNotEmpty(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+
+ ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ GroupHeartbeatEntity result = groupHeartbeatMapper.selectByKey(component, request.getInstance(),
+ request.getInlongGroupId());
+ return CommonBeanUtils.copyProperties(result, GroupHeartbeatResponse::new);
+ default:
+ throw new BusinessException("Unsupported component type for " + component);
}
- return null;
}
- /**
- * get heartbeat status info
- * @param component componentTypeName
- * @param instance instanceIdentifier
- * @param inlongGroupId inlongGroupId
- * @param inlongStreamId inlongStreamId
- * @return heartbeatStatusInfoResponse
- */
@Override
- public StreamHeartbeatResponse getStreamHeartbeatInfo(String component,
- String instance, String inlongGroupId, String inlongStreamId) {
- if (component != null && StringUtils.isNotEmpty(component)) {
- ComponentTypeEnum componentType =
- ComponentTypeEnum.valueOf(component);
- switch (componentType) {
- case Sort:
- case DataProxy:
- case Agent:
- case Cache:
- default:
- return getStreamHeartbeatByDefaultWay(component, instance,
- inlongGroupId, inlongStreamId);
- }
- } else {
- LOGGER.warn("request is null or component type is null");
+ public StreamHeartbeatResponse getStreamHeartbeat(HeartbeatQueryRequest request) {
+ Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+ String component = request.getComponent();
+ Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+ Preconditions.checkNotEmpty(request.getInstance(), ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
+ Preconditions.checkNotEmpty(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotEmpty(request.getInlongStreamId(), ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+
+ ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ StreamHeartbeatEntity result = streamHeartbeatMapper.selectByKey(component, request.getInstance(),
+ request.getInlongGroupId(), request.getInlongStreamId());
+ return CommonBeanUtils.copyProperties(result, StreamHeartbeatResponse::new);
+ default:
+ throw new BusinessException("Unsupported component type for " + component);
}
- return null;
}
- /**
- * get component heartbeat infos
- * @param component component
- * @param pageNum pageNum
- * @param pageSize pageSize
- * @return pageInfos
- */
@Override
- public PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfos(String component,
- int pageNum, int pageSize) {
- if (component != null && StringUtils.isNotEmpty(component)) {
- ComponentTypeEnum componentType =
- ComponentTypeEnum.valueOf(component);
- switch (componentType) {
- case Sort:
- case DataProxy:
- case Agent:
- case Cache:
- default:
- return getComponentHeartbeatInfosByDefaultWay(component, pageNum, pageSize);
- }
- } else {
- LOGGER.warn("request is null or component type is null");
+ public PageInfo<ComponentHeartbeatResponse> listComponentHeartbeat(HeartbeatPageRequest request) {
+ Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+ String component = request.getComponent();
+ Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+
+ ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ return listComponentHeartbeatOpt(request);
+ default:
+ throw new BusinessException("Unsupported component type for " + component);
}
- return null;
}
- /**
- * get group heartbeat infos
- * @param component component
- * @param instance instance
- * @param pageNum pageNum
- * @param pageSize pageSize
- * @return pageInfo
- */
@Override
- public PageInfo<GroupHeartbeatResponse> getGroupHeartbeatInfos(String component,
- String instance, int pageNum, int pageSize) {
- if (component != null && StringUtils.isNotEmpty(component)) {
- ComponentTypeEnum componentType =
- ComponentTypeEnum.valueOf(component);
- switch (componentType) {
- case Sort:
- case DataProxy:
- case Agent:
- case Cache:
- default:
- return getGroupHeartbeatsByDefaultWay(component, instance,
- pageNum, pageSize);
- }
- } else {
- LOGGER.warn("request is null or component type is null");
+ public PageInfo<GroupHeartbeatResponse> listGroupHeartbeat(HeartbeatPageRequest request) {
+ Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+ String component = request.getComponent();
+ Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+
+ ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ return listGroupHeartbeatOpt(request);
+ default:
+ throw new BusinessException("Unsupported component type for " + component);
}
- return null;
}
- /**
- * get stream heartbeat infos
- * @param component component
- * @param instance instance
- * @param inlongGroupId inlongGroupId
- * @param pageNum pageNum
- * @param pageSize pageSize
- * @return pageInfo
- */
@Override
- public PageInfo<StreamHeartbeatResponse> getStreamHeartbeatInfos(String component,
- String instance, String inlongGroupId, int pageNum, int pageSize) {
- if (component != null && StringUtils.isNotEmpty(component)) {
- ComponentTypeEnum componentType =
- ComponentTypeEnum.valueOf(component);
- switch (componentType) {
- case Sort:
- case DataProxy:
- case Agent:
- case Cache:
- default:
- return getStreamHeartbeatsByDefaultWay(component, instance,
- inlongGroupId, pageNum, pageSize);
- }
- } else {
- LOGGER.warn("request is null or component type is null");
+ public PageInfo<StreamHeartbeatResponse> listStreamHeartbeat(HeartbeatPageRequest request) {
+ Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+ String component = request.getComponent();
+ Preconditions.checkNotEmpty(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
+ Preconditions.checkNotEmpty(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+
+ ComponentTypeEnum componentType = ComponentTypeEnum.valueOf(component);
+ switch (componentType) {
+ case Sort:
+ case DataProxy:
+ case Agent:
+ case Cache:
+ return listStreamHeartbeatOpt(request);
+ default:
+ throw new BusinessException("Unsupported component type for " + component);
}
- return null;
}
/**
- * update By DefaultWay
- * @param request request
+ * Default implementation for updating heartbeat
*/
- private void updateByDefaultWay(HeartbeatReportRequest request) {
+ private Boolean updateHeartbeatOpt(HeartbeatReportRequest request) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("InlongHeartbeatReportRequest json = {}", gson.toJson(request));
+ LOGGER.debug("heartbeat request json = {}", GSON.toJson(request));
}
- updateComponentHeartbeatData(request);
- updateGroupHeartbeatData(request);
- updateStreamHeartBeatData(request);
- }
+ String component = request.getComponent();
+ String instance = request.getInstance();
+ Long reportTime = request.getReportTime();
- /**
- * get Heartbeat InfoByDefaultWay
- * @param component component
- * @param instance instance
- * @return HeartbeatInfoResponse
- */
- private ComponentHeartbeatResponse getComponentHeartbeatInfoByDefaultWay(String component,
- String instance) {
- ComponentHeartbeatEntityWithBLOBs result =
- componentHeartbeatEntityMapper.selectByKey(component, instance);
- ComponentHeartbeatResponse componentHeartBeatResponse = null;
- if (result != null) {
- componentHeartBeatResponse = new ComponentHeartbeatResponse();
- componentHeartBeatResponse.setComponent(result.getComponent());
- componentHeartBeatResponse.setInstance(result.getInstance());
- componentHeartBeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
- componentHeartBeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
- componentHeartBeatResponse.setReportTime(result.getReportTime().getTime());
+ // Add component heartbeats
+ ComponentHeartbeat componentHeartbeat = request.getComponentHeartbeat();
+ if (componentHeartbeat != null) {
+ ComponentHeartbeatEntity entity = new ComponentHeartbeatEntity();
+ entity.setComponent(component);
+ entity.setInstance(instance);
+ entity.setReportTime(reportTime);
+ entity.setStatusHeartbeat(componentHeartbeat.getStatusHeartbeat());
+ entity.setMetricHeartbeat(componentHeartbeat.getMetricHeartbeat());
+ componentHeartbeatMapper.insertOrUpdateByKey(entity);
}
- return componentHeartBeatResponse;
- }
- /**
- * get heartbeat StaticInfo ByDefaultWay
- * @param component component
- * @param instance instance
- * @param inlongGroupId inlongGroupId
- * @return heartbeatStaticInfoResponse
- */
- private GroupHeartbeatResponse getGroupHeartbeatByDefaultWay(String component,
- String instance, String inlongGroupId) {
-
- GroupHeartbeatEntityWithBLOBs result =
- groupHeartbeatEntityMapper.selectByKey(component,
- instance, inlongGroupId);
- GroupHeartbeatResponse groupHeartbeatResponse = null;
- if (result != null) {
- groupHeartbeatResponse = new GroupHeartbeatResponse();
- groupHeartbeatResponse.setInlongGroupId(inlongGroupId);
- groupHeartbeatResponse.setComponent(result.getComponent());
- groupHeartbeatResponse.setInstance(result.getInstance());
- groupHeartbeatResponse.setReportTime(result.getReportTime().getTime());
- groupHeartbeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
- groupHeartbeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
+ // Add group heartbeats
+ List<GroupHeartbeat> groupHeartbeats = request.getGroupHeartbeats();
+ if (CollectionUtils.isNotEmpty(groupHeartbeats)) {
+ groupHeartbeatMapper.insertOrUpdateAll(component, instance, reportTime, groupHeartbeats);
}
- return groupHeartbeatResponse;
- }
- /**
- * get Heartbeat StatusInfo ByDefaultWay
- * @param componentTypeName componentTypeName
- * @param instanceIdentifier instanceIdentifier
- * @param inlongGroupId inlongGroupId
- * @return heartbeatStatusInfoResponse
- */
- private StreamHeartbeatResponse getStreamHeartbeatByDefaultWay(String componentTypeName,
- String instanceIdentifier, String inlongGroupId, String inlongStreamId) {
- StreamHeartbeatEntityWithBLOBs result =
- streamHeartbeatEntityMapper.selectByKey(componentTypeName,
- instanceIdentifier, inlongGroupId, inlongStreamId);
- StreamHeartbeatResponse streamHeartBeatResponse = null;
- if (result != null) {
- streamHeartBeatResponse = new StreamHeartbeatResponse();
- streamHeartBeatResponse.setComponent(result.getComponent());
- streamHeartBeatResponse.setInstance(result.getInstance());
- streamHeartBeatResponse.setReportTime(result.getReportTime().getTime());
- streamHeartBeatResponse.setInlongGroupId(result.getInlongGroupId());
- streamHeartBeatResponse.setInlongStreamId(result.getInlongStreamId());
- streamHeartBeatResponse.setMetricHeartbeat(result.getMetricHeartbeat());
- streamHeartBeatResponse.setStatusHeartbeat(result.getStatusHeartbeat());
+ // Add stream heartbeats
+ List<StreamHeartbeat> streamHeartbeats = request.getStreamHeartbeats();
+ if (CollectionUtils.isNotEmpty(streamHeartbeats)) {
+ streamHeartbeatMapper.insertOrUpdateAll(component, instance, reportTime, streamHeartbeats);
}
- return streamHeartBeatResponse;
+
+ return true;
}
- /**
- * get Heartbeat InfoByDefaultWay
- * @param component component
- * @param pageNum pageNum
- * @param pageSize pageSize
- * @return HeartbeatInfoResponse
- */
- private PageInfo<ComponentHeartbeatResponse> getComponentHeartbeatInfosByDefaultWay(String component,
- int pageNum, int pageSize) {
- Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
- PageHelper.startPage(pageNum, pageSize);
-
- Page<ComponentHeartbeatEntityWithBLOBs> entityPage = (Page<ComponentHeartbeatEntityWithBLOBs>)
- componentHeartbeatEntityMapper.selectHeartbeats(component);
-
- List<ComponentHeartbeatResponse> componentHeartBeatResponses = new ArrayList<>();
- PageInfo<ComponentHeartbeatResponse> pageInfo;
- if (entityPage != null) {
- componentHeartBeatResponses = CommonBeanUtils
- .copyListProperties(entityPage, ComponentHeartbeatResponse::new);
- }
- pageInfo = new PageInfo<>(componentHeartBeatResponses);
- pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+ private PageInfo<ComponentHeartbeatResponse> listComponentHeartbeatOpt(HeartbeatPageRequest request) {
+ PageHelper.startPage(request.getPageNum(), request.getPageSize());
+ Page<ComponentHeartbeatEntity> entityPage = (Page<ComponentHeartbeatEntity>)
+ componentHeartbeatMapper.selectByCondition(request);
+ List<ComponentHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
+ ComponentHeartbeatResponse::new);
+
+ PageInfo<ComponentHeartbeatResponse> pageInfo = new PageInfo<>(responseList);
+ pageInfo.setTotal(responseList.size());
return pageInfo;
}
- /**
- * get heartbeat StaticInfo ByDefaultWay
- * @param component component
- * @param instance instance
- * @param pageNum pageNum
- * @param pageSize pageSize
- * @return heartbeatStaticInfoResponse
- */
- private PageInfo<GroupHeartbeatResponse> getGroupHeartbeatsByDefaultWay(String component,
- String instance, int pageNum, int pageSize) {
- Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
- Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
- PageHelper.startPage(pageNum, pageSize);
- Page<GroupHeartbeatEntityWithBLOBs> entityPage = (Page<GroupHeartbeatEntityWithBLOBs>)
- groupHeartbeatEntityMapper.selectHeartbeats(component, instance);
- List<GroupHeartbeatResponse> groupHeartbeatResponses = new ArrayList<>();
- PageInfo<GroupHeartbeatResponse> pageInfo;
- if (entityPage != null) {
- groupHeartbeatResponses = CommonBeanUtils
- .copyListProperties(entityPage, GroupHeartbeatResponse::new);
- }
- pageInfo = new PageInfo<>(groupHeartbeatResponses);
- pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+ private PageInfo<GroupHeartbeatResponse> listGroupHeartbeatOpt(HeartbeatPageRequest request) {
+ PageHelper.startPage(request.getPageNum(), request.getPageSize());
+ Page<GroupHeartbeatEntity> entityPage = (Page<GroupHeartbeatEntity>) groupHeartbeatMapper.selectByCondition(
+ request);
+ List<GroupHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
+ GroupHeartbeatResponse::new);
+
+ PageInfo<GroupHeartbeatResponse> pageInfo = new PageInfo<>(responseList);
+ pageInfo.setTotal(responseList.size());
return pageInfo;
}
- /**
- * get Heartbeat StatusInfo ByDefaultWay
- * @param component component
- * @param instance instance
- * @param inlongGroupId inlongGroupId
- * @param pageNum pageNum
- * @param pageSize pageSize
- * @return heartbeatStatusInfoResponse
- */
- private PageInfo<StreamHeartbeatResponse> getStreamHeartbeatsByDefaultWay(String component,
- String instance, String inlongGroupId, int pageNum, int pageSize) {
- Preconditions.checkNotNull(component, ErrorCodeEnum.REQUEST_COMPONENT_EMPTY.getMessage());
- Preconditions.checkNotNull(instance, ErrorCodeEnum.REQUEST_INSTANCE_EMPTY.getMessage());
- Preconditions.checkNotNull(inlongGroupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
- PageHelper.startPage(pageNum, pageSize);
- Page<StreamHeartbeatEntityWithBLOBs> entityPage = (Page<StreamHeartbeatEntityWithBLOBs>)
- streamHeartbeatEntityMapper.selectHeartbeats(component, instance, inlongGroupId);
- List<StreamHeartbeatResponse> streamHeartBeatResponses = new ArrayList<>();
- PageInfo<StreamHeartbeatResponse> pageInfo;
- if (entityPage != null) {
- streamHeartBeatResponses = CommonBeanUtils
- .copyListProperties(entityPage, StreamHeartbeatResponse::new);
- }
- pageInfo = new PageInfo<>(streamHeartBeatResponses);
- pageInfo.setTotal(entityPage == null ? 0 : entityPage.getTotal());
+ private PageInfo<StreamHeartbeatResponse> listStreamHeartbeatOpt(HeartbeatPageRequest request) {
+ PageHelper.startPage(request.getPageNum(), request.getPageSize());
+ Page<StreamHeartbeatEntity> entityPage = (Page<StreamHeartbeatEntity>)
+ streamHeartbeatMapper.selectByCondition(request);
+ List<StreamHeartbeatResponse> responseList = CommonBeanUtils.copyListProperties(entityPage,
+ StreamHeartbeatResponse::new);
+
+ PageInfo<StreamHeartbeatResponse> pageInfo = new PageInfo<>(responseList);
+ pageInfo.setTotal(responseList.size());
return pageInfo;
}
+
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
new file mode 100644
index 000000000..06d0cc5d2
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.impl;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatQueryRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntity;
+import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntity;
+import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntity;
+import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.BDDMockito.given;
+
+@RunWith(SpringRunner.class)
+public class HeartbeatServiceTest {
+
+ @InjectMocks
+ private HeartbeatService heartbeatService = new HeartbeatServiceImpl();
+ @Mock
+ private ComponentHeartbeatEntityMapper componentHeartbeatMapper;
+ @Mock
+ private GroupHeartbeatEntityMapper groupHeartbeatMapper;
+ @Mock
+ private StreamHeartbeatEntityMapper streamHeartbeatMapper;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ ComponentHeartbeatEntity componentHeartbeat = new ComponentHeartbeatEntity();
+ componentHeartbeat.setComponent("Sort");
+ componentHeartbeat.setInstance("127.0.0.1");
+ componentHeartbeat.setStatusHeartbeat("[{\"status\":\"running\"}]");
+ componentHeartbeat.setMetricHeartbeat("[{\"mem\":\"16gb\",\"cpu\":\"60%\"}]");
+ componentHeartbeat.setReportTime(System.currentTimeMillis());
+ Page<ComponentHeartbeatEntity> componentPage = new Page<>();
+ componentPage.add(componentHeartbeat);
+ componentPage.setTotal(1);
+ given(componentHeartbeatMapper.insert(new ComponentHeartbeatEntity())).willReturn(1);
+ given(componentHeartbeatMapper.selectByKey(Mockito.anyString(),
+ Mockito.anyString())).willReturn(componentHeartbeat);
+ given(componentHeartbeatMapper.selectByCondition(Mockito.any())).willReturn(componentPage);
+
+ GroupHeartbeatEntity groupHeartbeat = new GroupHeartbeatEntity();
+ groupHeartbeat.setComponent("Sort");
+ groupHeartbeat.setInstance("127.0.0.1");
+ groupHeartbeat.setStatusHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+ + ": \"10\"},\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+ groupHeartbeat.setReportTime(System.currentTimeMillis());
+ groupHeartbeat.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+ + ": \"10\"},"
+ + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+ Page<GroupHeartbeatEntity> groupPage = new Page<>();
+ groupPage.add(groupHeartbeat);
+ groupPage.setTotal(1);
+ given(groupHeartbeatMapper.selectByKey(Mockito.anyString(),
+ Mockito.anyString(), Mockito.anyString())).willReturn(groupHeartbeat);
+ given(groupHeartbeatMapper.selectByCondition(Mockito.any())).willReturn(groupPage);
+
+ StreamHeartbeatEntity streamHeartbeat = new StreamHeartbeatEntity();
+ streamHeartbeat.setComponent("Sort");
+ streamHeartbeat.setInstance("127.0.0.1");
+ streamHeartbeat.setInlongGroupId("group1");
+ streamHeartbeat.setInlongStreamId("test_test");
+ streamHeartbeat.setStatusHeartbeat("[{\"statue\":\"running\"}]");
+ streamHeartbeat.setMetricHeartbeat("[{\"outMsg\":\"1\",\"inMsg\":2}]");
+ streamHeartbeat.setReportTime(System.currentTimeMillis());
+
+ Page<StreamHeartbeatEntity> streamPage = new Page<>();
+ streamPage.add(streamHeartbeat);
+ streamPage.setTotal(1);
+
+ given(streamHeartbeatMapper.selectByKey(Mockito.anyString(),
+ Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
+ .willReturn(streamHeartbeat);
+ given(streamHeartbeatMapper.selectByCondition(Mockito.any())).willReturn(streamPage);
+ }
+
+ @Test
+ public void testReportHeartbeat() {
+ HeartbeatReportRequest request = new HeartbeatReportRequest();
+ request.setComponent("Sort");
+ request.setInstance("127.0.0.1");
+ request.setReportTime(Instant.now().toEpochMilli());
+
+ ComponentHeartbeat componentHeartbeat = new ComponentHeartbeat();
+ componentHeartbeat.setMetricHeartbeat("{\"mem\":\"100\"}");
+ componentHeartbeat.setStatusHeartbeat("{\"runningTime\":\"10h.35m\","
+ + "\"status\":\"10h.35m\","
+ + "\"groupIds\":\"group1,group2\"}");
+
+ List<GroupHeartbeat> groupHeartbeats = new ArrayList<>();
+ GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
+ groupHeartbeat.setInlongGroupId("group1");
+ groupHeartbeat.setStatusHeartbeat("[{\"status\":\"running\",\"streamIds\":\"1,2,3,4\"}]");
+ request.setGroupHeartbeats(groupHeartbeats);
+
+ StreamHeartbeat streamHeartbeat = new StreamHeartbeat();
+ streamHeartbeat.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
+ + ": \"10\"},\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
+ streamHeartbeat.setStatusHeartbeat("{}");
+ streamHeartbeat.setInlongGroupId("group1");
+ streamHeartbeat.setInlongStreamId("stream1");
+ List<StreamHeartbeat> streamHeartbeats = new ArrayList<>();
+ streamHeartbeats.add(streamHeartbeat);
+ request.setStreamHeartbeats(streamHeartbeats);
+
+ Assert.assertTrue(heartbeatService.reportHeartbeat(request));
+ }
+
+ @Test
+ public void testGetComponentHeartbeat() {
+ HeartbeatQueryRequest request = new HeartbeatQueryRequest();
+ request.setComponent("Sort");
+ request.setInstance("127.0.0.1");
+ ComponentHeartbeatResponse response = heartbeatService.getComponentHeartbeat(request);
+ Assert.assertEquals("127.0.0.1", response.getInstance());
+ }
+
+ @Test
+ public void testGetGroupHeartbeat() {
+ HeartbeatQueryRequest request = new HeartbeatQueryRequest();
+ request.setComponent("Sort");
+ request.setInstance("127.0.0.1");
+ request.setInlongGroupId("group1");
+ GroupHeartbeatResponse response = heartbeatService.getGroupHeartbeat(request);
+ Assert.assertEquals("127.0.0.1", response.getInstance());
+ }
+
+ @Test
+ public void testGetStreamHeartbeat() {
+ HeartbeatQueryRequest request = new HeartbeatQueryRequest();
+ request.setComponent("Sort");
+ request.setInstance("127.0.0.1");
+ request.setInlongGroupId("group1");
+ request.setInlongStreamId("stream1");
+
+ StreamHeartbeatResponse response = heartbeatService.getStreamHeartbeat(request);
+ Assert.assertEquals("127.0.0.1", response.getInstance());
+ }
+
+ @Test
+ public void testListComponentHeartbeat() {
+ HeartbeatPageRequest request = new HeartbeatPageRequest();
+ request.setComponent("Sort");
+ request.setPageNum(1);
+ request.setPageSize(10);
+ PageInfo<ComponentHeartbeatResponse> pageResponse = heartbeatService.listComponentHeartbeat(request);
+ Assert.assertEquals(1, pageResponse.getTotal());
+ }
+
+ @Test
+ public void testListGroupHeartbeat() {
+ HeartbeatPageRequest request = new HeartbeatPageRequest();
+ request.setComponent("Sort");
+ request.setInstance("127.0.0.1");
+ request.setPageNum(1);
+ request.setPageSize(10);
+ PageInfo<GroupHeartbeatResponse> pageResponse = heartbeatService.listGroupHeartbeat(request);
+ Assert.assertEquals(1, pageResponse.getTotal());
+ }
+
+ @Test
+ public void testListStreamHeartbeat() {
+ HeartbeatPageRequest request = new HeartbeatPageRequest();
+ request.setComponent("Sort");
+ request.setInstance("127.0.0.1");
+ request.setInlongGroupId("group1");
+ request.setPageNum(1);
+ request.setPageSize(10);
+ PageInfo<StreamHeartbeatResponse> pageResponse = heartbeatService.listStreamHeartbeat(request);
+ Assert.assertEquals(1, pageResponse.getTotal());
+ }
+}
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index d797e2091..7d8146bb3 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -621,22 +621,22 @@ CREATE TABLE `stream_source_field`
DROP TABLE IF EXISTS `stream_transform_field`;
CREATE TABLE `stream_transform_field`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
- `transform_id` int(11) NOT NULL COMMENT 'Transform id',
+ `transform_id` int(11) NOT NULL COMMENT 'Transform id',
`transform_type` varchar(15) NOT NULL COMMENT 'Transform type',
`field_name` varchar(50) NOT NULL COMMENT 'Field name',
`field_value` varchar(128) DEFAULT NULL COMMENT 'Field value, required if it is a predefined field',
`pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
`field_type` varchar(50) NOT NULL COMMENT 'Field type',
`field_comment` varchar(2000) DEFAULT NULL COMMENT 'Field description',
- `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
`field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
- `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
- `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
PRIMARY KEY (`id`),
- KEY `index_transform_id` (`transform_id`)
+ KEY `index_transform_id` (`transform_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform field table';
@@ -1105,8 +1105,7 @@ CREATE TABLE `sort_source_config`
`ext_params` text DEFAULT NULL COMMENT 'Another fields, will saved as JSON type',
PRIMARY KEY (`id`),
KEY `index_sort_source_config` (`cluster_name`, `task_name`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8mb4 COMMENT ='Sort source config table';
+);
-- ----------------------------
@@ -1126,69 +1125,65 @@ CREATE TABLE `stream_config_log`
`report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'report time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`ip`, `config_name`, `component_name`, `log_type`, `inlong_stream_id`, `inlong_group_id`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8 COMMENT ='stream config log report information table';
+);
-- ----------------------------
--- Table structure for inlong beat heart
+-- Table structure for inlong component heartbeat
-- ----------------------------
DROP TABLE IF EXISTS `component_heartbeat`;
CREATE TABLE `component_heartbeat`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
- `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
- `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component identifier ip or id',
- `status_heartbeat` text NOT NULL COMMENT 'component status info',
- `metric_heartbeat` text NOT NULL COMMENT 'component metric info',
- `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+ `status_heartbeat` text NOT NULL COMMENT 'Status heartbeat info',
+ `metric_heartbeat` text NOT NULL COMMENT 'Metric heartbeat info',
+ `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `index_beat_heart` (`component`, `instance`),
- KEY `index_report_time` (`report_time`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8 COMMENT ='inlong heartbeat';
+ UNIQUE KEY `unique_component_heartbeat` (`component`, `instance`)
+);
-- ----------------------------
--- Table structure for inlong stream status info
+-- Table structure for inlong group heartbeat
-- ----------------------------
DROP TABLE IF EXISTS `group_heartbeat`;
CREATE TABLE `group_heartbeat`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
- `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
- `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component ip or id',
- `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
- `status_heartbeat` text NOT NULL COMMENT 'Inlong group status info',
- `metric_heartbeat` text NOT NULL COMMENT 'Inlong group metric info',
- `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+ `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
+ `status_heartbeat` text NOT NULL COMMENT 'Status heartbeat info',
+ `metric_heartbeat` text NOT NULL COMMENT 'Metric heartbeat info',
+ `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8 COMMENT ='inlong group heartbeat';
+ UNIQUE KEY `unique_group_heartbeat` (`component`, `instance`, `inlong_group_id`)
+);
-- ----------------------------
--- Table structure for inlong component static info
+-- Table structure for inlong stream heartbeat
-- ----------------------------
DROP TABLE IF EXISTS `stream_heartbeat`;
CREATE TABLE `stream_heartbeat`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
- `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
- `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component ip or id',
- `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
- `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong metric id',
- `status_heartbeat` text NOT NULL COMMENT 'status info',
- `metric_heartbeat` text NOT NULL COMMENT 'static info',
- `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+ `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong stream id',
+ `status_heartbeat` text NOT NULL COMMENT 'Status heartbeat info',
+ `metric_heartbeat` text NOT NULL COMMENT 'Metric heartbeat info',
+ `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Report time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`)
-) ENGINE = InnoDB
- DEFAULT CHARSET = utf8 COMMENT ='inlong stream heartbeat';
+ UNIQUE KEY `unique_stream_heartbeat` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`)
+);
+
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 138d0f68b..5d501a001 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -1182,67 +1182,65 @@ CREATE TABLE `stream_config_log`
DEFAULT CHARSET = utf8 COMMENT ='stream config log report information table';
-- ----------------------------
--- Table structure for inlong beat heart
+-- Table structure for inlong component heartbeat
-- ----------------------------
DROP TABLE IF EXISTS `component_heartbeat`;
CREATE TABLE `component_heartbeat`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
- `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
- `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component identifier ip or id',
- `status_heartbeat` text NOT NULL COMMENT 'component status info',
- `metric_heartbeat` text NOT NULL COMMENT 'component metric info',
- `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+ `status_heartbeat` text NOT NULL COMMENT 'Status heartbeat info',
+ `metric_heartbeat` text NOT NULL COMMENT 'Metric heartbeat info',
+ `report_time` bigint(20) NOT NULL COMMENT 'Report time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `index_beat_heart` (`component`, `instance`),
- KEY `index_report_time` (`report_time`)
+ UNIQUE KEY `unique_component_heartbeat` (`component`, `instance`)
) ENGINE = InnoDB
- DEFAULT CHARSET = utf8 COMMENT ='inlong heartbeat';
+ DEFAULT CHARSET = utf8 COMMENT ='Inlong component heartbeat';
-- ----------------------------
--- Table structure for inlong stream status info
+-- Table structure for inlong group heartbeat
-- ----------------------------
DROP TABLE IF EXISTS `group_heartbeat`;
CREATE TABLE `group_heartbeat`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
- `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
- `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component ip or id',
- `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
- `status_heartbeat` text NOT NULL COMMENT 'Inlong group status info',
- `metric_heartbeat` text NOT NULL COMMENT 'Inlong group metric info',
- `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+ `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
+ `status_heartbeat` text NOT NULL COMMENT 'Status heartbeat info',
+ `metric_heartbeat` text NOT NULL COMMENT 'Metric heartbeat info',
+ `report_time` bigint(20) NOT NULL COMMENT 'Report time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `index_stream_status` (`component`, `instance`, `inlong_group_id`),
- KEY `index_report_time` (`report_time`)
+ UNIQUE KEY `unique_group_heartbeat` (`component`, `instance`, `inlong_group_id`)
) ENGINE = InnoDB
- DEFAULT CHARSET = utf8 COMMENT ='inlong group heartbeat';
+ DEFAULT CHARSET = utf8 COMMENT ='Inlong group heartbeat';
-- ----------------------------
--- Table structure for inlong component static info
+-- Table structure for inlong stream heartbeat
-- ----------------------------
DROP TABLE IF EXISTS `stream_heartbeat`;
CREATE TABLE `stream_heartbeat`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
- `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'component',
- `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'component ip or id',
- `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong group id',
- `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Inlong metric id',
- `status_heartbeat` text NOT NULL COMMENT 'status info',
- `metric_heartbeat` text NOT NULL COMMENT 'static info',
- `report_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'report time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `component` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component name, such as: Agent, Sort...',
+ `instance` varchar(64) NOT NULL DEFAULT '' COMMENT 'Component instance, can be ip, name...',
+ `inlong_group_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL DEFAULT '' COMMENT 'Owning inlong stream id',
+ `status_heartbeat` text NOT NULL COMMENT 'Status heartbeat info',
+ `metric_heartbeat` text NOT NULL COMMENT 'Metric heartbeat info',
+ `report_time` bigint(20) NOT NULL COMMENT 'Report time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
- UNIQUE KEY `index_component_static` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`),
- KEY `index_report_time` (`report_time`)
+ UNIQUE KEY `unique_stream_heartbeat` (`component`, `instance`, `inlong_group_id`, `inlong_stream_id`)
) ENGINE = InnoDB
- DEFAULT CHARSET = utf8 COMMENT ='inlong stream heartbeat';
+ DEFAULT CHARSET = utf8 COMMENT ='Inlong stream heartbeat';
+
-- ----------------------------
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatController.java
new file mode 100644
index 000000000..080f0a80f
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatController.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import com.github.pagehelper.PageInfo;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatPageRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatQueryRequest;
+import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
+import org.apache.inlong.manager.service.core.HeartbeatService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/heartbeat")
+@Api(tags = "Heartbeat-API")
+public class HeartbeatController {
+
+ @Autowired
+ private HeartbeatService heartbeatService;
+
+ @RequestMapping(value = "/component/get", method = RequestMethod.POST)
+ @ApiOperation(value = "Get component heartbeat")
+ public Response<ComponentHeartbeatResponse> getComponentHeartbeat(@RequestBody HeartbeatQueryRequest request) {
+ return Response.success(heartbeatService.getComponentHeartbeat(request));
+ }
+
+ @RequestMapping(value = "/group/get", method = RequestMethod.POST)
+ @ApiOperation(value = "Get group heartbeat")
+ public Response<GroupHeartbeatResponse> getGroupHeartbeat(@RequestBody HeartbeatQueryRequest request) {
+ return Response.success(heartbeatService.getGroupHeartbeat(request));
+ }
+
+ @RequestMapping(value = "/stream/get", method = RequestMethod.POST)
+ @ApiOperation(value = "Get stream heartbeat")
+ public Response<StreamHeartbeatResponse> getStreamHeartbeat(@RequestBody HeartbeatQueryRequest request) {
+ return Response.success(heartbeatService.getStreamHeartbeat(request));
+ }
+
+ @RequestMapping(value = "/component/list", method = RequestMethod.POST)
+ @ApiOperation(value = "List component heartbeats")
+ public Response<PageInfo<ComponentHeartbeatResponse>> listComponentHeartbeat(
+ @RequestBody HeartbeatPageRequest request) {
+ return Response.success(heartbeatService.listComponentHeartbeat(request));
+ }
+
+ @RequestMapping(value = "/group/list", method = RequestMethod.POST)
+ @ApiOperation(value = "List group heartbeats")
+ public Response<PageInfo<GroupHeartbeatResponse>> listGroupHeartbeat(@RequestBody HeartbeatPageRequest request) {
+ return Response.success(heartbeatService.listGroupHeartbeat(request));
+ }
+
+ @RequestMapping(value = "/stream/list", method = RequestMethod.POST)
+ @ApiOperation(value = "List stream heartbeats")
+ public Response<PageInfo<StreamHeartbeatResponse>> listStreamHeartbeat(@RequestBody HeartbeatPageRequest request) {
+ return Response.success(heartbeatService.listStreamHeartbeat(request));
+ }
+
+}
+
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java
deleted file mode 100644
index 4087499c2..000000000
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/HeartbeatInfoController.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.web.controller;
-
-import com.github.pagehelper.PageInfo;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatPageRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
-import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatPageRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
-import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatPageRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
-import org.apache.inlong.manager.service.core.HeartbeatService;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RestController;
-
-@RestController
-@RequestMapping("/heartbeat/")
-@Api(tags = "Heartbeat")
-public class HeartbeatInfoController {
-
- @Autowired
- private HeartbeatService heartbeatService;
-
- @RequestMapping(value = "/component/info", method = RequestMethod.POST)
- @ApiOperation(value = "query component heartbeat")
- public Response<ComponentHeartbeatResponse> queryComponentHeartbeatInfo(@RequestBody
- ComponentHeartbeatRequest info) {
- ComponentHeartbeatResponse response =
- heartbeatService.getComponentHeartbeatInfo(info.getComponent(),
- info.getInstance());
- if (response == null) {
- return Response.fail("Not found msg!");
- } else {
- return Response.success(response);
- }
- }
-
- @RequestMapping(value = "/group/info", method = RequestMethod.POST)
- @ApiOperation(value = "query group heartbeat")
- public Response<GroupHeartbeatResponse> queryGroupHeartbeatInfo(@RequestBody
- GroupHeartbeatRequest info) {
- GroupHeartbeatResponse response = heartbeatService
- .getGroupHeartbeatInfo(info.getComponent(), info.getInstance(),
- info.getInlongGroupId());
- if (response == null) {
- return Response.fail("Not found msg!");
- } else {
- return Response.success(response);
- }
- }
-
- @RequestMapping(value = "/stream/info", method = RequestMethod.POST)
- @ApiOperation(value = "query stream heartbeat")
- public Response<StreamHeartbeatResponse> queryStreamHeartbeat(@RequestBody
- StreamHeartbeatRequest info) {
- StreamHeartbeatResponse response = heartbeatService
- .getStreamHeartbeatInfo(info.getComponent(),
- info.getInstance(), info.getInlongGroupId(), info.getInlongStreamId());
- if (response == null) {
- return Response.fail("Not found msg!");
- } else {
- return Response.success(response);
- }
- }
-
- @RequestMapping(value = "/component/list", method = RequestMethod.POST)
- @ApiOperation(value = "query component heartbeats")
- public Response<PageInfo<ComponentHeartbeatResponse>> queryComponentHeartbeatInfos(@RequestBody
- ComponentHeartbeatPageRequest info) {
- PageInfo<ComponentHeartbeatResponse> responses =
- heartbeatService.getComponentHeartbeatInfos(info.getComponent(), info.getPageNum(),
- info.getPageSize());
- if (responses == null) {
- return Response.fail("Not found msg!");
- } else {
- return Response.success(responses);
- }
- }
-
- @RequestMapping(value = "/group/list", method = RequestMethod.POST)
- @ApiOperation(value = "query group heartbeats")
- public Response<PageInfo<GroupHeartbeatResponse>> queryGroupHeartbeatInfos(@RequestBody
- GroupHeartbeatPageRequest info) {
- PageInfo<GroupHeartbeatResponse> responses = heartbeatService
- .getGroupHeartbeatInfos(info.getComponent(), info.getInstance(),
- info.getPageNum(), info.getPageSize());
- if (responses == null) {
- return Response.fail("Not found msg!");
- } else {
- return Response.success(responses);
- }
- }
-
- @RequestMapping(value = "/stream/list", method = RequestMethod.POST)
- @ApiOperation(value = "query stream heartbeats")
- public Response<PageInfo<StreamHeartbeatResponse>> queryStreamHeartbeats(@RequestBody
- StreamHeartbeatPageRequest info) {
- PageInfo<StreamHeartbeatResponse> responses = heartbeatService
- .getStreamHeartbeatInfos(info.getComponent(), info.getInstance(),
- info.getInlongGroupId(), info.getPageNum(), info.getPageSize());
- if (responses == null) {
- return Response.fail("Not found msg!");
- } else {
- return Response.success(responses);
- }
- }
-
-}
-
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenHeartbeatController.java
similarity index 77%
rename from inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java
rename to inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenHeartbeatController.java
index ea6cd0e42..5dbf44a34 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenHeartbeatController.java
@@ -23,7 +23,6 @@ import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
import org.apache.inlong.manager.service.core.HeartbeatService;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -31,17 +30,17 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/openapi/heartbeat")
-@Api(tags = "Heartbeat")
-public class HeartbeatController {
+@Api(tags = "Open-Heartbeat-API")
+public class OpenHeartbeatController {
@Autowired
- private HeartbeatService heartBeatService;
+ private HeartbeatService heartbeatService;
- @PostMapping(value = "/report",
- produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
+ @PostMapping(value = "/report")
@ApiOperation(value = "heartbeat report")
- public Response<String> reportHeartbeat(@RequestBody HeartbeatReportRequest info) {
- return Response.success(heartBeatService.reportHeartbeatInfo(info));
+ public Response<Boolean> reportHeartbeat(@RequestBody HeartbeatReportRequest info) {
+ return Response.success(heartbeatService.reportHeartbeat(info));
}
+
}
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java
deleted file mode 100644
index 7ed34b8b0..000000000
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/openapi/HeartbeatControllerTest.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.web.controller.openapi;
-
-import static org.mockito.BDDMockito.given;
-
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageInfo;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeat;
-import org.apache.inlong.manager.common.pojo.heartbeat.ComponentHeartbeatResponse;
-import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeat;
-import org.apache.inlong.manager.common.pojo.heartbeat.GroupHeartbeatResponse;
-import org.apache.inlong.manager.common.pojo.heartbeat.HeartbeatReportRequest;
-import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeat;
-import org.apache.inlong.manager.common.pojo.heartbeat.StreamHeartbeatResponse;
-import org.apache.inlong.manager.dao.entity.ComponentHeartbeatEntityWithBLOBs;
-import org.apache.inlong.manager.dao.entity.GroupHeartbeatEntityWithBLOBs;
-import org.apache.inlong.manager.dao.entity.StreamHeartbeatEntityWithBLOBs;
-import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
-import org.apache.inlong.manager.dao.mapper.GroupHeartbeatEntityMapper;
-import org.apache.inlong.manager.dao.mapper.StreamHeartbeatEntityMapper;
-import org.apache.inlong.manager.service.core.HeartbeatService;
-import org.apache.inlong.manager.service.core.impl.HeartbeatServiceImpl;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@RunWith(SpringRunner.class)
-public class HeartbeatControllerTest {
-
- @InjectMocks
- private HeartbeatService heartbeatService = new HeartbeatServiceImpl();
-
- @Mock
- private ComponentHeartbeatEntityMapper componentHeartbeatEntityMapper;
-
- @Mock
- private GroupHeartbeatEntityMapper groupHeartbeatEntityMapper;
-
- @Mock
- private StreamHeartbeatEntityMapper streamHeartbeatEntityMapper;
-
- @Before
- public void setUp() {
- MockitoAnnotations.openMocks(this);
-
- ComponentHeartbeatEntityWithBLOBs componentHeartbeatEntityWithBLOBs =
- new ComponentHeartbeatEntityWithBLOBs();
- componentHeartbeatEntityWithBLOBs.setComponent("Sort");
- componentHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
- componentHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"inlongGroupId\":\"groupId\","
- + "\"componentStaticInfo\":\"\"}]");
- componentHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"inlongGroupId\":\"groupId\","
- + "\"streamStatusInfo\":\"\"}]");
- componentHeartbeatEntityWithBLOBs.setReportTime(new Date());
- Page<ComponentHeartbeatEntityWithBLOBs> componentPage = new Page<>();
- componentPage.add(componentHeartbeatEntityWithBLOBs);
- componentPage.setTotal(1);
- given(componentHeartbeatEntityMapper.insert(new ComponentHeartbeatEntityWithBLOBs())).willReturn(1);
- given(componentHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
- Mockito.anyString())).willReturn(componentHeartbeatEntityWithBLOBs);
- given(componentHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString()))
- .willReturn(componentPage);
-
- GroupHeartbeatEntityWithBLOBs groupHeartbeatEntityWithBLOBs =
- new GroupHeartbeatEntityWithBLOBs();
- groupHeartbeatEntityWithBLOBs.setComponent("Sort");
- groupHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
- groupHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
- + ": \"10\"},"
- + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
- groupHeartbeatEntityWithBLOBs.setReportTime(new Date());
- groupHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
- + ": \"10\"},"
- + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
- Page<GroupHeartbeatEntityWithBLOBs> groupPage = new Page<>();
- groupPage.add(groupHeartbeatEntityWithBLOBs);
- groupPage.setTotal(1);
- given(groupHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
- Mockito.anyString(), Mockito.anyString())).willReturn(groupHeartbeatEntityWithBLOBs);
- given(groupHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString(),
- Mockito.anyString())).willReturn(groupPage);
-
- StreamHeartbeatEntityWithBLOBs streamHeartbeatEntityWithBLOBs = new StreamHeartbeatEntityWithBLOBs();
- streamHeartbeatEntityWithBLOBs.setComponent("Sort");
- streamHeartbeatEntityWithBLOBs.setInstance("127.0.0.1");
- streamHeartbeatEntityWithBLOBs.setInlongGroupId("group1");
- streamHeartbeatEntityWithBLOBs.setInlongStreamId("test_test");
- streamHeartbeatEntityWithBLOBs.setStatusHeartbeat("[{\"statue\":\"running\"}]");
- streamHeartbeatEntityWithBLOBs.setMetricHeartbeat("[{\"outMsg\":\"1\",\"inMsg\":2}]");
- streamHeartbeatEntityWithBLOBs.setReportTime(new Date());
-
- Page<StreamHeartbeatEntityWithBLOBs> streamPage = new Page<>();
- streamPage.add(streamHeartbeatEntityWithBLOBs);
- streamPage.setTotal(1);
-
- given(streamHeartbeatEntityMapper.selectByKey(Mockito.anyString(),
- Mockito.anyString(), Mockito.anyString(), Mockito.anyString()))
- .willReturn(streamHeartbeatEntityWithBLOBs);
- given(streamHeartbeatEntityMapper.selectHeartbeats(Mockito.anyString(),
- Mockito.anyString(), Mockito.anyString()))
- .willReturn(streamPage);
- }
-
- @Test
- public void testAddHeartbeat() throws Exception {
-
- HeartbeatReportRequest request = new HeartbeatReportRequest();
- request.setComponent("Sort");
- request.setInstance("127.0.0.1");
- request.setReportTimestamp(Instant.now().toEpochMilli());
-
- ComponentHeartbeat componentHeartbeat = new ComponentHeartbeat();
- componentHeartbeat.setMetricHeartbeat("{\"mem\":\"100\"}");
- componentHeartbeat.setStatusHeartbeat("{\"runningTime\":\"10h.35m\","
- + "\"status\":\"10h.35m\","
- + "\"groupIds\":\"group1,group2\"}");
-
- List<GroupHeartbeat> groupHeartbeats = new ArrayList<>();
- GroupHeartbeat groupHeartbeat = new GroupHeartbeat();
- groupHeartbeat.setInlongGroupId("group1");
- groupHeartbeat.setStatusHeartbeat("[{\"status\":\"running\",\"streamIds\":\"1,2,3,4\"}]");
- request.setGroupHeartbeats(groupHeartbeats);
-
- StreamHeartbeat streamHeartbeat = new StreamHeartbeat();
- streamHeartbeat.setMetricHeartbeat("[{\"summaryMetric\":{\"totalRecordNumOfRead\""
- + ": \"10\"},"
- + "\"streamMetrics\":[{\"streamId\":\"stream1\"}]}]");
- streamHeartbeat.setStatusHeartbeat("{}");
- streamHeartbeat.setInlongGroupId("group1");
- streamHeartbeat.setInlongStreamId("1");
- List<StreamHeartbeat> streamHeartbeats = new ArrayList<>();
- streamHeartbeats.add(streamHeartbeat);
- request.setStreamHeartbeats(streamHeartbeats);
-
- Assert.assertEquals("Success", heartbeatService.reportHeartbeatInfo(request));
- }
-
- @Test
- public void testQueryComponentHeartbeat() throws Exception {
- ComponentHeartbeatResponse response
- = heartbeatService.getComponentHeartbeatInfo("Sort", "127.0.0.1");
- Assert.assertEquals("127.0.0.1", response.getInstance());
- }
-
- @Test
- public void testQueryGroupHeartbeat() throws Exception {
- GroupHeartbeatResponse response
- = heartbeatService.getGroupHeartbeatInfo("Sort",
- "127.0.0.1", "group1");
- Assert.assertEquals("127.0.0.1", response.getInstance());
- }
-
- @Test
- public void testQueryStreamHeartbeat() throws Exception {
- StreamHeartbeatResponse response =
- heartbeatService.getStreamHeartbeatInfo("Sort",
- "127.0.0.1", "group1", "stream1");
- Assert.assertEquals("127.0.0.1", response.getInstance());
- }
-
- @Test
- public void testQueryComponentHeartbeatPage() throws Exception {
- PageInfo<ComponentHeartbeatResponse> pageResponse
- = heartbeatService.getComponentHeartbeatInfos("Sort", 1,
- 10);
- Assert.assertEquals(1, pageResponse.getTotal());
- }
-
- @Test
- public void testQueryGroupHeartbeatPage() throws Exception {
- PageInfo<GroupHeartbeatResponse> pageResponse
- = heartbeatService.getGroupHeartbeatInfos("Sort",
- "127.0.0.1", 1, 10);
- Assert.assertEquals(1, pageResponse.getTotal());
- }
-
- @Test
- public void testQueryStreamHeartbeatPage() throws Exception {
- PageInfo<StreamHeartbeatResponse> pageResponse =
- heartbeatService.getStreamHeartbeatInfos("Sort",
- "127.0.0.1", "group1", 1, 10);
- Assert.assertEquals(1, pageResponse.getTotal());
- }
-}