You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/18 06:20:28 UTC

[inlong] branch master updated: [INLONG-5088][Manager] Support only consumes the MQ cluster with the same tag (#5090)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fa0cd9b8 [INLONG-5088][Manager] Support only consumes the MQ cluster with the same tag (#5090)
3fa0cd9b8 is described below

commit 3fa0cd9b8fe8284a58f6fb3fef59132f91d7c23a
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Mon Jul 18 14:20:23 2022 +0800

    [INLONG-5088][Manager] Support only consumes the MQ cluster with the same tag (#5090)
---
 .../service/core/impl/SortSourceServiceImpl.java   | 32 +++++++++++++++++-----
 1 file changed, 25 insertions(+), 7 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 4475910aa..123cc47ad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -202,25 +202,43 @@ public class SortSourceServiceImpl implements SortSourceService {
                 .filter(cluster -> SUPPORTED_MQ_TYPE.contains(cluster.getType()))
                 .collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
 
+        // group clusters by name.
+        Map<String, SortSourceClusterInfo> name2ClusterInfos = clusterInfos.stream()
+                .collect(Collectors.toMap(SortSourceClusterInfo::getName, info -> info, (g1, g2) -> g1));
+
         // Prepare CacheZones for each cluster and task
         Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
         Map<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<>();
-        groupMap.forEach((cluster, task2Group) -> {
-
+        groupMap.forEach((clusterName, task2Group) -> {
+
+            // if there is no matched cluster name, just skip
+            if (!name2ClusterInfos.containsKey(clusterName)) {
+                return;
+            }
+            // find valid mq cluster list
+            String clusterTag = name2ClusterInfos.get(clusterName).getClusterTags();
+            final Map<String, List<SortSourceClusterInfo>> validClusterInfos = new ConcurrentHashMap<>();
+            if (allTag2ClusterInfos.containsKey(clusterTag)) {
+                validClusterInfos.put(clusterTag, allTag2ClusterInfos.get(clusterTag));
+            } else {
+                validClusterInfos.putAll(allTag2ClusterInfos);
+            }
+
+            // prepare the new config and md5
             Map<String, CacheZoneConfig> task2Config = new ConcurrentHashMap<>();
             Map<String, String> task2Md5 = new ConcurrentHashMap<>();
 
             task2Group.forEach((task, groupList) -> {
                 Map<String, CacheZone> cacheZones;
                 try {
-                    cacheZones = this.getCacheZones(groupList, allId2GroupInfos, allTag2ClusterInfos);
+                    cacheZones = this.getCacheZones(groupList, allId2GroupInfos, validClusterInfos);
                 } catch (Throwable t) {
-                    LOGGER.error("fail to get cacheZones of cluster {}, task {}", cluster, task);
+                    LOGGER.error("fail to get cacheZones of clusterName {}, task {}", clusterName, task);
                     return;
                 }
                 CacheZoneConfig config = CacheZoneConfig.builder()
                         .cacheZones(cacheZones)
-                        .sortClusterName(cluster)
+                        .sortClusterName(clusterName)
                         .sortTaskId(task)
                         .build();
                 String jsonStr = GSON.toJson(config);
@@ -229,8 +247,8 @@ public class SortSourceServiceImpl implements SortSourceService {
                 task2Md5.put(task, md5);
             });
 
-            newConfigMap.put(cluster, task2Config);
-            newMd5Map.put(cluster, task2Md5);
+            newConfigMap.put(clusterName, task2Config);
+            newMd5Map.put(clusterName, task2Md5);
         });
 
         sortSourceConfigMap = newConfigMap;