You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/21 03:34:35 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5953][Manager] Filter DataProxy nodes by status when necessary (#5954)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 6972020d1 [INLONG-5953][Manager] Filter DataProxy nodes by status when necessary (#5954)
6972020d1 is described below

commit 6972020d1795b00e88ffe761c0cf3b83a33ef2c0
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Sep 21 11:32:13 2022 +0800

    [INLONG-5953][Manager] Filter DataProxy nodes by status when necessary (#5954)
---
 .../mappers/InlongClusterNodeEntityMapper.xml      |  5 ++++-
 .../inlong/manager/pojo/node/DataNodeRequest.java  |  2 +-
 .../service/cluster/InlongClusterServiceImpl.java  |  8 +++++--
 .../service/cluster/InlongClusterServiceTest.java  | 25 +++++++++++++++++++++-
 4 files changed, 35 insertions(+), 5 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
index bb4a23f10..c184eab02 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml
@@ -95,9 +95,12 @@
             <if test="type != null and type != ''">
                 and type = #{type, jdbcType=VARCHAR}
             </if>
-            <if test="parentId != null and parentId != ''">
+            <if test="parentId != null">
                 and parent_id = #{parentId, jdbcType=INTEGER}
             </if>
+            <if test="status != null">
+                and status = #{status, jdbcType=INTEGER}
+            </if>
             <if test="keyword != null and keyword != ''">
                 and (
                 ip like CONCAT('%', #{keyword}, '%')
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
index 54504e2dc..4f520bf21 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
@@ -34,7 +34,7 @@ import javax.validation.constraints.NotNull;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
-@ApiModel("Data node  request")
+@ApiModel("Data node request")
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
 public abstract class DataNodeRequest {
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index d3a05b7ff..e175f8c00 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.NodeStatus;
 import org.apache.inlong.manager.common.enums.UserTypeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -649,11 +650,14 @@ public class InlongClusterServiceImpl implements InlongClusterService {
             throw new BusinessException(msg);
         }
 
-        // if more than one data proxy cluster, currently takes first
         // TODO consider the data proxy load and re-balance
         List<DataProxyNodeInfo> nodeInfos = new ArrayList<>();
         for (InlongClusterEntity entity : clusterList) {
-            List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByParentId(entity.getId());
+            ClusterPageRequest request = ClusterPageRequest.builder()
+                    .parentId(entity.getId())
+                    .status(NodeStatus.NORMAL.getStatus())
+                    .build();
+            List<InlongClusterNodeEntity> nodeList = clusterNodeMapper.selectByCondition(request);
             for (InlongClusterNodeEntity nodeEntity : nodeList) {
                 DataProxyNodeInfo nodeInfo = new DataProxyNodeInfo();
                 nodeInfo.setId(nodeEntity.getId());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index a7249e866..23b2326ce 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.cluster;
 
+import org.apache.inlong.common.enums.ComponentTypeEnum;
+import org.apache.inlong.common.heartbeat.HeartbeatMsg;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.manager.common.consts.MQType;
@@ -31,10 +33,12 @@ import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.util.Comparator;
 import java.util.List;
 
 /**
@@ -44,6 +48,8 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
 
     @Autowired
     private InlongClusterService clusterService;
+    @Autowired
+    private HeartbeatManager heartbeatManager;
 
     /**
      * Save data proxy cluster
@@ -139,6 +145,16 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         return clusterService.updateNode(request, GLOBAL_OPERATOR);
     }
 
+    private HeartbeatMsg createHeartbeatMsg(String clusterName, String ip, int port, String type) {
+        HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
+        heartbeatMsg.setIp(ip);
+        heartbeatMsg.setPort(port);
+        heartbeatMsg.setComponentType(type);
+        heartbeatMsg.setReportTime(System.currentTimeMillis());
+        heartbeatMsg.setClusterName(clusterName);
+        return heartbeatMsg;
+    }
+
     /**
      * Delete cluster node info.
      */
@@ -201,7 +217,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
     }
 
     @Test
-    public void testGetDataProxyIp() {
+    public void testGetDataProxyIp() throws InterruptedException {
         String clusterTag = "default_cluster";
         String clusterName = "test_data_proxy";
         String extTag = "ext_1";
@@ -220,6 +236,12 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         Integer nodeId2 = this.saveClusterNode(id, ClusterType.DATAPROXY, ip, port2);
         Assertions.assertNotNull(nodeId2);
 
+        // report heartbeat
+        HeartbeatMsg msg1 = createHeartbeatMsg(clusterName, ip, port1, ComponentTypeEnum.DataProxy.getName());
+        heartbeatManager.reportHeartbeat(msg1);
+        HeartbeatMsg msg2 = createHeartbeatMsg(clusterName, ip, port2, ComponentTypeEnum.DataProxy.getName());
+        heartbeatManager.reportHeartbeat(msg2);
+
         // create an inlong group which use the clusterTag
         String inlongGroupId = "test_cluster_tag_group";
         InlongGroupInfo inlongGroup = super.createInlongGroup(inlongGroupId, MQType.PULSAR);
@@ -230,6 +252,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         // get the data proxy nodes, the first port should is p1, second port is p2
         DataProxyNodeResponse nodeResponse = clusterService.getDataProxyNodes(inlongGroupId);
         List<DataProxyNodeInfo> ipList = nodeResponse.getNodeList();
+        ipList.sort(Comparator.comparingInt(DataProxyNodeInfo::getId));
         Assertions.assertEquals(ipList.size(), 2);
         Assertions.assertEquals(port1, ipList.get(0).getPort());
         Assertions.assertEquals(port2, ipList.get(1).getPort());