You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 03:34:35 UTC
[inlong] branch release-1.3.0 updated: [INLONG-5953][Manager] Filter DataProxy nodes by status when necessary (#5954)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 6972020d1 [INLONG-5953][Manager] Filter DataProxy nodes by status when necessary (#5954)
6972020d1 is described below
commit 6972020d1795b00e88ffe761c0cf3b83a33ef2c0
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Sep 21 11:32:13 2022 +0800
[INLONG-5953][Manager] Filter DataProxy nodes by status when necessary (#5954)
---
.../mappers/InlongClusterNodeEntityMapper.xml | 5 ++++-
.../inlong/manager/pojo/node/DataNodeRequest.java | 2 +-
.../service/cluster/InlongClusterServiceImpl.java | 8 +++++--
.../service/cluster/InlongClusterServiceTest.java | 25 +++++++++++++++++++++-
4 files changed, 35 insertions(+), 5 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index bb4a23f10..c184eab02 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -95,9 +95,12 @@
<if test="type != null and type != ''">
and type = #{type, jdbcType=VARCHAR}
</if>
- <if test="parentId != null and parentId != ''">
+ <if test="parentId != null">
and parent_id = #{parentId, jdbcType=INTEGER}
</if>
+ <if test="status != null">
+ and status = #{status, jdbcType=INTEGER}
+ </if>
<if test="keyword != null and keyword != ''">
and (
ip like CONCAT('%', #{keyword}, '%')
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
index 54504e2dc..4f520bf21 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
@@ -34,7 +34,7 @@ import javax.validation.constraints.NotNull;
@Data
@NoArgsConstructor
@AllArgsConstructor
-@ApiModel("Data node request")
+@ApiModel("Data node request")
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
public abstract class DataNodeRequest {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index d3a05b7ff..e175f8c00 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -649,11 +650,14 @@ public class InlongClusterServiceImpl implements InlongClusterService {
throw new BusinessException(msg);
}
- // if more than one data proxy cluster, currently takes first
// TODO consider the data proxy load and re-balance
List<DataProxyNodeInfo> nodeInfos = new ArrayList<>();
for (InlongClusterEntity entity : clusterList) {
- List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(entity.getId());
+ ClusterPageRequest request = ClusterPageRequest.builder()
+ .parentId(entity.getId())
+ .status(NodeStatus.NORMAL.getStatus())
+ .build();
+ List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByCondition(request);
for (InlongClusterNodeEntity nodeEntity : nodeList) {
DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo();
nodeInfo.setId(nodeEntity.getId());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index a7249e866..23b2326ce 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.cluster;
+import org.apache.inlong.common.enums.ComponentTypeEnum;
+import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.manager.common.consts.MQType;
@@ -31,10 +33,12 @@ import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import java.util.Comparator;
import java.util.List;
/**
@@ -44,6 +48,8 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
@Autowired
private InlongClusterService clusterService;
+ @Autowired
+ private HeartbeatManager heartbeatManager;
/**
* Save data proxy cluster
@@ -139,6 +145,16 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
return clusterService.updateNode(request, GLOBAL_OPERATOR);
}
+ private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, int port, String type) {
+ HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
+ heartbeatMsg.setIp(ip);
+ heartbeatMsg.setPort(port);
+ heartbeatMsg.setComponentType(type);
+ heartbeatMsg.setReportTime(System.currentTimeMillis());
+ heartbeatMsg.setClusterName(clusterName);
+ return heartbeatMsg;
+ }
+
/**
* Delete cluster node info.
*/
@@ -201,7 +217,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
}
@Test
- public void testGetDataProxyIp() {
+ public void testGetDataProxyIp() throws InterruptedException {
String clusterTag = "default_cluster";
String clusterName = "test_data_proxy";
String extTag = "ext_1";
@@ -220,6 +236,12 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2);
Assertions.assertNotNull(nodeId2);
+ // report heartbeat
+ HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, port1, ComponentTypeEnum.DataProxy.getName());
+ heartbeatManager.reportHeartbeat(msg1);
+ HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, port2, ComponentTypeEnum.DataProxy.getName());
+ heartbeatManager.reportHeartbeat(msg2);
+
// create an inlong group which use the clusterTag
String inlongGroupId = "test_cluster_tag_group";
InlongGroupInfo inlongGroup = super.createInlongGroup(inlongGroupId, MQType.PULSAR);
@@ -230,6 +252,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
// get the data proxy nodes, the first port should is p1, second port is p2
DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId);
List<DataProxyNodeInfo> ipList = nodeResponse.getNodeList();
+ ipList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
Assertions.assertEquals(ipList.size(), 2);
Assertions.assertEquals(port1, ipList.get(0).getPort());
Assertions.assertEquals(port2, ipList.get(1).getPort());