You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/18 06:20:28 UTC
[inlong] branch master updated: [INLONG-5088][Manager] Support only consumes the MQ cluster with the same tag (#5090)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3fa0cd9b8 [INLONG-5088][Manager] Support only consumes the MQ cluster with the same tag (#5090)
3fa0cd9b8 is described below
commit 3fa0cd9b8fe8284a58f6fb3fef59132f91d7c23a
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Mon Jul 18 14:20:23 2022 +0800
[INLONG-5088][Manager] Support only consumes the MQ cluster with the same tag (#5090)
---
.../service/core/impl/SortSourceServiceImpl.java | 32 +++++++++++++++++-----
1 file changed, 25 insertions(+), 7 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 4475910aa..123cc47ad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -202,25 +202,43 @@ public class SortSourceServiceImpl implements SortSourceService {
.filter(cluster -> SUPPORTED_MQ_TYPE.contains(cluster.getType()))
.collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
+ // group clusters by name.
+ Map<String, SortSourceClusterInfo> name2ClusterInfos = clusterInfos.stream()
+ .collect(Collectors.toMap(SortSourceClusterInfo::getName, info -> info, (g1, g2) -> g1));
+
// Prepare CacheZones for each cluster and task
Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
Map<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<>();
- groupMap.forEach((cluster, task2Group) -> {
-
+ groupMap.forEach((clusterName, task2Group) -> {
+
+ // if there is no matched cluster name, just skip
+ if (!name2ClusterInfos.containsKey(clusterName)) {
+ return;
+ }
+ // find valid mq cluster list
+ String clusterTag = name2ClusterInfos.get(clusterName).getClusterTags();
+ final Map<String, List<SortSourceClusterInfo>> validClusterInfos = new ConcurrentHashMap<>();
+ if (allTag2ClusterInfos.containsKey(clusterTag)) {
+ validClusterInfos.put(clusterTag, allTag2ClusterInfos.get(clusterTag));
+ } else {
+ validClusterInfos.putAll(allTag2ClusterInfos);
+ }
+
+ // prepare the new config and md5
Map<String, CacheZoneConfig> task2Config = new ConcurrentHashMap<>();
Map<String, String> task2Md5 = new ConcurrentHashMap<>();
task2Group.forEach((task, groupList) -> {
Map<String, CacheZone> cacheZones;
try {
- cacheZones = this.getCacheZones(groupList, allId2GroupInfos, allTag2ClusterInfos);
+ cacheZones = this.getCacheZones(groupList, allId2GroupInfos, validClusterInfos);
} catch (Throwable t) {
- LOGGER.error("fail to get cacheZones of cluster {}, task {}", cluster, task);
+ LOGGER.error("fail to get cacheZones of clusterName {}, task {}", clusterName, task);
return;
}
CacheZoneConfig config = CacheZoneConfig.builder()
.cacheZones(cacheZones)
- .sortClusterName(cluster)
+ .sortClusterName(clusterName)
.sortTaskId(task)
.build();
String jsonStr = GSON.toJson(config);
@@ -229,8 +247,8 @@ public class SortSourceServiceImpl implements SortSourceService {
task2Md5.put(task, md5);
});
- newConfigMap.put(cluster, task2Config);
- newMd5Map.put(cluster, task2Md5);
+ newConfigMap.put(clusterName, task2Config);
+ newMd5Map.put(clusterName, task2Md5);
});
sortSourceConfigMap = newConfigMap;