You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/07/17 14:40:50 UTC

[GitHub] [inlong] vernedeng opened a new pull request, #5090: [INLONG-5088][Manager] SortSource Service support only consume the MQ cluster with the same tag

vernedeng opened a new pull request, #5090:
URL: https://github.com/apache/inlong/pull/5090

   [INLONG-5088][Manager]  SortSource Service support only consume the MQ cluster with the same tag
   - Fixes #5088 
   
   ### Motivation
   
   Current getSortSource API will return all the stream of the given task, without consider whether the MQ cluster of these streams match the task or not.
   
   
   ### Modifications
   
   SortSource Service support only consume the MQ cluster with the same tag
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is already covered by existing tests, such as:
   SortServiceImplTest
   
   ### Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] vernedeng commented on a diff in pull request #5090: [INLONG-5088][Manager] Support only consumes the MQ cluster with the same tag

Posted by GitBox <gi...@apache.org>.
vernedeng commented on code in PR #5090:
URL: https://github.com/apache/inlong/pull/5090#discussion_r922945347


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java:
##########
@@ -202,25 +202,43 @@ private void reloadAllSourceConfig() {
                 .filter(cluster -> SUPPORTED_MQ_TYPE.contains(cluster.getType()))
                 .collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
 
+        // group clusters by name.
+        Map<String, SortSourceClusterInfo> name2ClusterInfos = clusterInfos.stream()
+                .collect(Collectors.toMap(SortSourceClusterInfo::getName, info -> info, (g1, g2) -> g1));
+
         // Prepare CacheZones for each cluster and task
         Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
         Map<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<>();
-        groupMap.forEach((cluster, task2Group) -> {
-
+        groupMap.forEach((clusterName, task2Group) -> {
+
+            // if there is no matched cluster name, just skip
+            if (!name2ClusterInfos.containsKey(clusterName)) {
+                return;
+            }
+            // find valid mq cluster list
+            String clusterTag = name2ClusterInfos.get(clusterName).getClusterTags();
+            final Map<String, List<SortSourceClusterInfo>> validClusterInfos = new ConcurrentHashMap<>();
+            if (allTag2ClusterInfos.containsKey(clusterTag)) {
+                validClusterInfos.put(clusterTag, allTag2ClusterInfos.get(clusterTag));
+            } else {
+                validClusterInfos.putAll(allTag2ClusterInfos);
+            }
+
+            // prepare

Review Comment:
   fixed, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] EMsnap commented on a diff in pull request #5090: [INLONG-5088][Manager] Support only consumes the MQ cluster with the same tag

Posted by GitBox <gi...@apache.org>.
EMsnap commented on code in PR #5090:
URL: https://github.com/apache/inlong/pull/5090#discussion_r922942575


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java:
##########
@@ -202,25 +202,43 @@ private void reloadAllSourceConfig() {
                 .filter(cluster -> SUPPORTED_MQ_TYPE.contains(cluster.getType()))
                 .collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
 
+        // group clusters by name.
+        Map<String, SortSourceClusterInfo> name2ClusterInfos = clusterInfos.stream()
+                .collect(Collectors.toMap(SortSourceClusterInfo::getName, info -> info, (g1, g2) -> g1));
+
         // Prepare CacheZones for each cluster and task
         Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
         Map<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<>();
-        groupMap.forEach((cluster, task2Group) -> {
-
+        groupMap.forEach((clusterName, task2Group) -> {
+
+            // if there is no matched cluster name, just skip
+            if (!name2ClusterInfos.containsKey(clusterName)) {
+                return;
+            }
+            // find valid mq cluster list
+            String clusterTag = name2ClusterInfos.get(clusterName).getClusterTags();
+            final Map<String, List<SortSourceClusterInfo>> validClusterInfos = new ConcurrentHashMap<>();
+            if (allTag2ClusterInfos.containsKey(clusterTag)) {
+                validClusterInfos.put(clusterTag, allTag2ClusterInfos.get(clusterTag));
+            } else {
+                validClusterInfos.putAll(allTag2ClusterInfos);
+            }
+
+            // prepare

Review Comment:
   pls make comment more clear



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow merged pull request #5090: [INLONG-5088][Manager] Support only consumes the MQ cluster with the same tag

Posted by GitBox <gi...@apache.org>.
healchow merged PR #5090:
URL: https://github.com/apache/inlong/pull/5090


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org