You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/30 04:39:27 UTC

[inlong] branch master updated: [INLONG-7254][Manager] Fix config error when InlongGroupId is in the process of switching (#7255)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c57b94279 [INLONG-7254][Manager] Fix config error when InlongGroupId is in the process of switching (#7255)
c57b94279 is described below

commit c57b9427931e293e8f1ac8d8efd9522cc0113eaa
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Mon Jan 30 12:39:21 2023 +0800

    [INLONG-7254][Manager] Fix config error when InlongGroupId is in the process of switching (#7255)
---
 .../service/core/impl/SortSourceServiceImpl.java   | 26 +++++++-
 .../manager/service/sort/SortServiceImplTest.java  | 78 ++++++++++++++++++----
 2 files changed, 88 insertions(+), 16 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index e52a9bfb4..dd772c18c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.inlong.common.pojo.sdk.Topic;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
 import org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
@@ -94,6 +95,7 @@ public class SortSourceServiceImpl implements SortSourceService {
      */
     private Map<String, Map<String, CacheZoneConfig>> sortSourceConfigMap = new ConcurrentHashMap<>();
 
+    private Map<String, SortSourceClusterInfo> sortClusters;
     private Map<String, List<SortSourceClusterInfo>> mqClusters;
     private Map<String, SortSourceGroupInfo> groupInfos;
     private Map<String, Map<String, SortSourceStreamInfo>> allStreams;
@@ -187,6 +189,8 @@ public class SortSourceServiceImpl implements SortSourceService {
 
         // reload mq cluster and sort cluster
         List<SortSourceClusterInfo> allClusters = configLoader.loadAllClusters();
+        sortClusters = allClusters.stream()
+                .collect(Collectors.toMap(SortSourceClusterInfo::getName, v -> v));
 
         // group mq clusters by cluster tag
         mqClusters = allClusters.stream()
@@ -256,7 +260,7 @@ public class SortSourceServiceImpl implements SortSourceService {
                                     .sortTaskId(taskName)
                                     .build();
                     Map<String, CacheZone> cacheZoneMap =
-                            this.parseCacheZones(sinkList);
+                            this.parseCacheZones(sortClusterName, sinkList);
                     cacheZoneConfig.setCacheZones(cacheZoneMap);
 
                     // prepare md5
@@ -275,6 +279,7 @@ public class SortSourceServiceImpl implements SortSourceService {
         });
         sortSourceConfigMap = newConfigMap;
         sortSourceMd5Map = newMd5Map;
+        sortClusters = null;
         mqClusters = null;
         groupInfos = null;
         allStreams = null;
@@ -285,8 +290,12 @@ public class SortSourceServiceImpl implements SortSourceService {
     }
 
     private Map<String, CacheZone> parseCacheZones(
+            String clusterName,
             List<SortSourceStreamSinkInfo> sinkList) {
 
+        Preconditions.checkNotNull(sortClusters.get(clusterName), "sort cluster should not be NULL");
+        String sortClusterTag = sortClusters.get(clusterName).getClusterTags();
+
         // get group infos
         List<SortSourceStreamSinkInfo> sinkInfoList = sinkList.stream()
                 .filter(sinkInfo -> groupInfos.containsKey(sinkInfo.getGroupId())
@@ -296,6 +305,13 @@ public class SortSourceServiceImpl implements SortSourceService {
 
         // group them by cluster tag.
         Map<String, List<SortSourceStreamSinkInfo>> tag2SinkInfos = sinkInfoList.stream()
+                .filter(sink -> Objects.nonNull(groupInfos.get(sink.getGroupId())))
+                .filter(sink -> {
+                    if (StringUtils.isBlank(sortClusterTag)) {
+                        return true;
+                    }
+                    return sortClusterTag.equals(groupInfos.get(sink.getGroupId()).getClusterTag());
+                })
                 .collect(Collectors.groupingBy(sink -> {
                     SortSourceGroupInfo groupInfo = groupInfos.get(sink.getGroupId());
                     return groupInfo.getClusterTag();
@@ -303,7 +319,13 @@ public class SortSourceServiceImpl implements SortSourceService {
 
         // group them by second cluster tag.
         Map<String, List<SortSourceStreamSinkInfo>> backupTag2SinkInfos = sinkInfoList.stream()
-                .filter(info -> backupClusterTag.containsKey(info.getGroupId()))
+                .filter(sink -> backupClusterTag.containsKey(sink.getGroupId()))
+                .filter(sink -> {
+                    if (StringUtils.isBlank(sortClusterTag)) {
+                        return true;
+                    }
+                    return sortClusterTag.equals(backupClusterTag.get(sink.getGroupId()));
+                })
                 .collect(Collectors.groupingBy(info -> backupClusterTag.get(info.getGroupId())));
 
         List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2SinkInfos, false);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index 9d4d417af..17ee08065 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.manager.service.sort;
 
 import org.apache.inlong.common.constant.ClusterSwitch;
+import org.apache.inlong.common.pojo.sdk.CacheZone;
+import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
 import org.apache.inlong.manager.common.consts.DataNodeType;
@@ -67,8 +69,12 @@ import java.util.Map;
 public class SortServiceImplTest extends ServiceBaseTest {
 
     private static final String TEST_GROUP = "test-group";
-    private static final String TEST_CLUSTER = "test-cluster";
-    private static final String TEST_TASK = "test-task";
+    private static final String TEST_CLUSTER_1 = "test-cluster-1";
+    private static final String TEST_CLUSTER_2 = "test-cluster-2";
+    private static final String TEST_CLUSTER_3 = "test-cluster-3";
+    private static final String TEST_TASK_1 = "test-task-1";
+    private static final String TEST_TASK_2 = "test-task-2";
+    private static final String TEST_TASK_3 = "test-task-3";
     private static final String TEST_STREAM_1 = "1";
     private static final String TEST_STREAM_2 = "2";
     private static final String TEST_TAG = "testTag";
@@ -114,8 +120,8 @@ public class SortServiceImplTest extends ServiceBaseTest {
     @Test
     @Order(2)
     @Transactional
-    public void testSourceCorrectParams() {
-        SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER, TEST_TASK, "");
+    public void testSourceCorrectParamsOfNoTagSortCluster() {
+        SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, "");
         JSONObject jo = new JSONObject(response);
         System.out.println(jo);
         Assertions.assertEquals(0, response.getCode());
@@ -128,9 +134,9 @@ public class SortServiceImplTest extends ServiceBaseTest {
     @Order(3)
     @Transactional
     public void testSourceSameMd5() {
-        SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER, TEST_TASK, "");
+        SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, "");
         String md5 = response.getMd5();
-        response = sortService.getSourceConfig(TEST_CLUSTER, TEST_TASK, md5);
+        response = sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, md5);
         System.out.println(response);
         Assertions.assertEquals(1, response.getCode());
         Assertions.assertEquals(md5, response.getMd5());
@@ -167,7 +173,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
     @Order(6)
     @Transactional
     public void testClusterCorrectParams() {
-        SortClusterResponse response = sortService.getClusterConfig(TEST_CLUSTER, "");
+        SortClusterResponse response = sortService.getClusterConfig(TEST_CLUSTER_1, "");
         JSONObject jo = new JSONObject(response);
         System.out.println(jo);
         Assertions.assertEquals(0, response.getCode());
@@ -180,9 +186,9 @@ public class SortServiceImplTest extends ServiceBaseTest {
     @Order(7)
     @Transactional
     public void testClusterSameMd5() {
-        SortClusterResponse response = sortService.getClusterConfig(TEST_CLUSTER, "");
+        SortClusterResponse response = sortService.getClusterConfig(TEST_CLUSTER_1, "");
         String md5 = response.getMd5();
-        response = sortService.getClusterConfig(TEST_CLUSTER, md5);
+        response = sortService.getClusterConfig(TEST_CLUSTER_1, md5);
         System.out.println(response);
         Assertions.assertEquals(1, response.getCode());
         Assertions.assertEquals(md5, response.getMd5());
@@ -202,17 +208,60 @@ public class SortServiceImplTest extends ServiceBaseTest {
         Assertions.assertNotNull(response.getMsg());
     }
 
+    @Test
+    @Order(9)
+    @Transactional
+    public void testSourceCorrectParamsOfTaggedSortCluster() {
+        SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER_2, TEST_TASK_2, "");
+        JSONObject jo = new JSONObject(response);
+        System.out.println(jo);
+        Assertions.assertEquals(0, response.getCode());
+        Assertions.assertNotNull(response.getMd5());
+        Assertions.assertNotNull(response.getMsg());
+        CacheZoneConfig config = response.getData();
+        Assertions.assertNotNull(config);
+        Assertions.assertEquals(TEST_CLUSTER_2, config.getSortClusterName());
+        Assertions.assertEquals(TEST_TASK_2, config.getSortTaskId());
+        Assertions.assertEquals(1, config.getCacheZones().size());
+        CacheZone zone = config.getCacheZones().get("testPulsar");
+        Assertions.assertNotNull(zone);
+        Assertions.assertEquals("testPulsar", zone.getZoneName());
+        Assertions.assertEquals(1, zone.getTopics().size());
+
+        response = sortService.getSourceConfig(TEST_CLUSTER_3, TEST_TASK_3, "");
+        jo = new JSONObject(response);
+        System.out.println(jo);
+        Assertions.assertEquals(0, response.getCode());
+        Assertions.assertNotNull(response.getMd5());
+        Assertions.assertNotNull(response.getMsg());
+        config = response.getData();
+        Assertions.assertNotNull(config);
+        Assertions.assertEquals(TEST_CLUSTER_3, config.getSortClusterName());
+        Assertions.assertEquals(TEST_TASK_3, config.getSortTaskId());
+        Assertions.assertEquals(1, config.getCacheZones().size());
+        zone = config.getCacheZones().get("backupPulsar");
+        Assertions.assertNotNull(zone);
+        Assertions.assertEquals("backupPulsar", zone.getZoneName());
+        Assertions.assertEquals(1, zone.getTopics().size());
+    }
+
     @BeforeEach
     private void prepareAll() {
-        this.prepareCluster(TEST_CLUSTER);
+        this.prepareCluster(TEST_CLUSTER_1, null);
+        this.prepareCluster(TEST_CLUSTER_2, TEST_TAG);
+        this.prepareCluster(TEST_CLUSTER_3, BACK_UP_TAG);
         this.preparePulsar("testPulsar", true, TEST_TAG);
         this.preparePulsar("backupPulsar", true, BACK_UP_TAG);
-        this.prepareDataNode(TEST_TASK);
+        this.prepareDataNode(TEST_TASK_1);
+        this.prepareDataNode(TEST_TASK_2);
+        this.prepareDataNode(TEST_TASK_3);
         this.prepareGroupId(TEST_GROUP);
         this.prepareStreamId(TEST_GROUP, TEST_STREAM_1, TEST_TOPIC_1);
         this.prepareStreamId(TEST_GROUP, TEST_STREAM_2, TEST_TOPIC_2);
-        this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_1);
-        this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_2);
+        this.prepareTask(TEST_TASK_1, TEST_GROUP, TEST_CLUSTER_1, TEST_STREAM_1);
+        this.prepareTask(TEST_TASK_1, TEST_GROUP, TEST_CLUSTER_1, TEST_STREAM_2);
+        this.prepareTask(TEST_TASK_2, TEST_GROUP, TEST_CLUSTER_2, TEST_STREAM_1);
+        this.prepareTask(TEST_TASK_3, TEST_GROUP, TEST_CLUSTER_3, TEST_STREAM_2);
     }
 
     private void prepareDataNode(String taskName) {
@@ -277,13 +326,14 @@ public class SortServiceImplTest extends ServiceBaseTest {
         streamService.save(request, "test_operator");
     }
 
-    private void prepareCluster(String clusterName) {
+    private void prepareCluster(String clusterName, String clusterTag) {
         InlongClusterEntity entity = new InlongClusterEntity();
         entity.setName(clusterName);
         entity.setType(DataNodeType.HIVE);
         entity.setExtParams("{}");
         entity.setCreator(TEST_CREATOR);
         entity.setInCharges(TEST_CREATOR);
+        entity.setClusterTags(clusterTag);
         Date now = new Date();
         entity.setCreateTime(now);
         entity.setModifyTime(now);