You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 11:20:57 UTC

[inlong] branch master updated: [INLONG-6426][Manager] SortSourceService support multi stream under one group (#6427)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 69ed8d0bb [INLONG-6426][Manager] SortSourceService support multi stream under one group (#6427)
69ed8d0bb is described below

commit 69ed8d0bbad4bb2226a5856c4b75d392300d700d
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Tue Nov 8 19:20:52 2022 +0800

    [INLONG-6426][Manager] SortSourceService support multi stream under one group (#6427)
---
 .../resources/mappers/InlongStreamEntityMapper.xml |  3 +-
 .../resources/mappers/StreamSinkEntityMapper.xml   |  1 +
 .../pojo/sort/standalone/SortSourceStreamInfo.java | 23 ++++++
 .../sort/standalone/SortSourceStreamSinkInfo.java  |  1 +
 .../service/core/impl/SortSourceServiceImpl.java   | 96 +++++++++++++---------
 .../manager/service/sort/SortServiceImplTest.java  | 24 +++---
 6 files changed, 97 insertions(+), 51 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
index 231769ccc..9fec7e7cd 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
@@ -289,7 +289,8 @@
         select
                inlong_group_id,
                inlong_stream_id,
-               mq_resource
+               mq_resource,
+               ext_params
         from inlong_stream
         where is_deleted = 0
     </select>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index f6422643c..996c85cf4 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -372,6 +372,7 @@
         select inlong_cluster_name as sortClusterName,
                sort_task_name,
                inlong_group_id     as groupId,
+               inlong_stream_id    as streamId,
                ext_params
         from stream_sink
         where is_deleted = 0
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java
index bd89a573c..294212c27 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java
@@ -17,11 +17,34 @@
 
 package org.apache.inlong.manager.pojo.sort.standalone;
 
+import com.google.gson.Gson;
 import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Data
 public class SortSourceStreamInfo {
+    private static final Logger LOGGER = LoggerFactory.getLogger(SortSourceStreamInfo.class);
+    private static final Gson GSON = new Gson();
+
     private String inlongGroupId;
     private String inlongStreamId;
     private String mqResource;
+    String extParams;
+    Map<String, String> extParamsMap = new ConcurrentHashMap<>();
+
+    public Map<String, String> getExtParamsMap() {
+        if (extParamsMap.isEmpty() && StringUtils.isNotBlank(extParams)) {
+            try {
+                extParamsMap = GSON.fromJson(extParams, Map.class);
+            } catch (Throwable t) {
+                LOGGER.error("fail to parse group ext params", t);
+            }
+        }
+        return extParamsMap;
+    }
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java
index d144b2574..917494252 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java
@@ -38,6 +38,7 @@ public class SortSourceStreamSinkInfo {
     String sortClusterName;
     String sortTaskName;
     String groupId;
+    String streamId;
     String extParams;
     Map<String, String> extParamsMap;
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 4ec62c36d..df92a55d9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -50,7 +50,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -95,11 +94,11 @@ public class SortSourceServiceImpl implements SortSourceService {
 
     private Map<String, List<SortSourceClusterInfo>> mqClusters;
     private Map<String, SortSourceGroupInfo> groupInfos;
-    private Map<String, SortSourceStreamInfo> allStreams;
+    private Map<String, Map<String, SortSourceStreamInfo>> allStreams;
     private Map<String, String> backupClusterTag;
     private Map<String, String> backupGroupMqResource;
-    private Map<String, String> backupStreamMqResource;
-    private Map<String, Map<String, List<String>>> groupMap;
+    private Map<String, Map<String, String>> backupStreamMqResource;
+    private Map<String, Map<String, List<SortSourceStreamSinkInfo>>> streamSinkMap;
 
     @Autowired
     private SortConfigLoader configLoader;
@@ -195,16 +194,16 @@ public class SortSourceServiceImpl implements SortSourceService {
 
         // reload all stream sinks, to Map<clusterName, Map<taskName, List<groupId>>> format
         List<SortSourceStreamSinkInfo> allStreamSinks = configLoader.loadAllStreamSinks();
-        groupMap = new HashMap<>();
+        streamSinkMap = new HashMap<>();
         allStreamSinks.stream()
                 .filter(sink -> sink.getSortClusterName() != null)
                 .filter(sink -> sink.getSortTaskName() != null)
                 .forEach(sink -> {
-                    Map<String, List<String>> task2groupsMap =
-                            groupMap.computeIfAbsent(sink.getSortClusterName(), k -> new ConcurrentHashMap<>());
-                    List<String> groupIdList =
+                    Map<String, List<SortSourceStreamSinkInfo>> task2groupsMap =
+                            streamSinkMap.computeIfAbsent(sink.getSortClusterName(), k -> new ConcurrentHashMap<>());
+                    List<SortSourceStreamSinkInfo> sinkInfoList =
                             task2groupsMap.computeIfAbsent(sink.getSortTaskName(), k -> new ArrayList<>());
-                    groupIdList.add(sink.getGroupId());
+                    sinkInfoList.add(sink);
                 });
 
         // reload all groups
@@ -225,12 +224,15 @@ public class SortSourceServiceImpl implements SortSourceService {
         // reload all streams
         allStreams = configLoader.loadAllStreams()
                 .stream()
-                .collect(Collectors.toMap(SortSourceStreamInfo::getInlongGroupId, stream -> stream));
+                .collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId,
+                        Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));
 
         // reload all back up stream mq resource
         backupStreamMqResource = configLoader.loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE)
                 .stream()
-                .collect(Collectors.toMap(InlongStreamExtEntity::getInlongGroupId, InlongStreamExtEntity::getKeyValue));
+                .collect(Collectors.groupingBy(InlongStreamExtEntity::getInlongGroupId,
+                        Collectors.toMap(InlongStreamExtEntity::getInlongStreamId,
+                                InlongStreamExtEntity::getKeyValue)));
     }
 
     private void parseAll() {
@@ -239,12 +241,12 @@ public class SortSourceServiceImpl implements SortSourceService {
         Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
         Map<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<>();
 
-        groupMap.forEach((sortClusterName, task2GroupList) -> {
+        streamSinkMap.forEach((sortClusterName, task2SinkList) -> {
             // prepare the new config and md5
             Map<String, CacheZoneConfig> task2Config = new ConcurrentHashMap<>();
             Map<String, String> task2Md5 = new ConcurrentHashMap<>();
 
-            task2GroupList.forEach((taskName, groupList) -> {
+            task2SinkList.forEach((taskName, sinkList) -> {
                 try {
                     CacheZoneConfig cacheZoneConfig =
                             CacheZoneConfig.builder()
@@ -252,7 +254,7 @@ public class SortSourceServiceImpl implements SortSourceService {
                                     .sortTaskId(taskName)
                                     .build();
                     Map<String, CacheZone> cacheZoneMap =
-                            this.parseCacheZones(sortClusterName, taskName, groupList);
+                            this.parseCacheZones(sinkList);
                     cacheZoneConfig.setCacheZones(cacheZoneMap);
 
                     // prepare md5
@@ -277,31 +279,33 @@ public class SortSourceServiceImpl implements SortSourceService {
         backupClusterTag = null;
         backupGroupMqResource = null;
         backupStreamMqResource = null;
-        groupMap = null;
+        streamSinkMap = null;
     }
 
     private Map<String, CacheZone> parseCacheZones(
-            String sortClusterName,
-            String taskName,
-            List<String> groupIdList) {
+            List<SortSourceStreamSinkInfo> sinkList) {
 
         // get group infos
-        List<SortSourceGroupInfo> groupInfoList = groupIdList.stream()
-                .filter(groupInfos::containsKey)
-                .map(groupInfos::get)
+        List<SortSourceStreamSinkInfo> sinkInfoList = sinkList.stream()
+                .filter(sinkInfo -> groupInfos.containsKey(sinkInfo.getGroupId())
+                                && allStreams.containsKey(sinkInfo.getGroupId())
+                                && allStreams.get(sinkInfo.getGroupId()).containsKey(sinkInfo.getStreamId()))
                 .collect(Collectors.toList());
 
         // group them by cluster tag.
-        Map<String, List<SortSourceGroupInfo>> tag2GroupInfos = groupInfoList.stream()
-                .collect(Collectors.groupingBy(SortSourceGroupInfo::getClusterTag));
+        Map<String, List<SortSourceStreamSinkInfo>> tag2SinkInfos = sinkInfoList.stream()
+                .collect(Collectors.groupingBy(sink -> {
+                    SortSourceGroupInfo groupInfo = groupInfos.get(sink.getGroupId());
+                    return groupInfo.getClusterTag();
+                }));
 
         // group them by second cluster tag.
-        Map<String, List<SortSourceGroupInfo>> backupTag2GroupInfos = groupInfoList.stream()
+        Map<String, List<SortSourceStreamSinkInfo>> backupTag2SinkInfos = sinkInfoList.stream()
                 .filter(info -> backupClusterTag.containsKey(info.getGroupId()))
                 .collect(Collectors.groupingBy(info -> backupClusterTag.get(info.getGroupId())));
 
-        List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2GroupInfos, false);
-        List<CacheZone> backupCacheZones = this.parseCacheZonesByTag(backupTag2GroupInfos, true);
+        List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2SinkInfos, false);
+        List<CacheZone> backupCacheZones = this.parseCacheZonesByTag(backupTag2SinkInfos, true);
 
         return Stream.of(cacheZones, backupCacheZones)
                 .flatMap(Collection::stream)
@@ -315,18 +319,20 @@ public class SortSourceServiceImpl implements SortSourceService {
                 );
     }
 
-    private List<CacheZone> parseCacheZonesByTag(Map<String, List<SortSourceGroupInfo>> tag2Groups, boolean isBackup) {
+    private List<CacheZone> parseCacheZonesByTag(
+            Map<String, List<SortSourceStreamSinkInfo>> tag2Sinks,
+            boolean isBackup) {
 
-        return tag2Groups.keySet().stream()
+        return tag2Sinks.keySet().stream()
                 .filter(mqClusters::containsKey)
                 .flatMap(tag -> {
-                    List<SortSourceGroupInfo> groups = tag2Groups.get(tag);
+                    List<SortSourceStreamSinkInfo> sinks = tag2Sinks.get(tag);
                     List<SortSourceClusterInfo> clusters = mqClusters.get(tag);
                     return clusters.stream()
                             .map(cluster -> {
                                 CacheZone zone = null;
                                 try {
-                                    zone = this.parseCacheZone(groups, cluster, isBackup);
+                                    zone = this.parseCacheZone(sinks, cluster, isBackup);
                                 } catch (IllegalStateException e) {
                                     LOGGER.error("fail to init cache zone for cluster " + cluster, e);
                                 }
@@ -337,11 +343,11 @@ public class SortSourceServiceImpl implements SortSourceService {
     }
 
     private CacheZone parseCacheZone(
-            List<SortSourceGroupInfo> groups,
+            List<SortSourceStreamSinkInfo> sinks,
             SortSourceClusterInfo cluster,
             boolean isBackupTag) {
         switch (cluster.getType()) {
-            case ClusterType.PULSAR: return parsePulsarZone(groups, cluster, isBackupTag);
+            case ClusterType.PULSAR: return parsePulsarZone(sinks, cluster, isBackupTag);
             default:
                 throw new BusinessException(String.format("do not support cluster type=%s of cluster=%s",
                         cluster.getType(), cluster));
@@ -349,24 +355,34 @@ public class SortSourceServiceImpl implements SortSourceService {
     }
 
     private CacheZone parsePulsarZone(
-            List<SortSourceGroupInfo> groups,
+            List<SortSourceStreamSinkInfo> sinks,
             SortSourceClusterInfo cluster,
             boolean isBackupTag) {
         Map<String, String> param = cluster.getExtParamsMap();
         String tenant = param.get(KEY_TENANT);
         String auth = param.get(KEY_AUTH);
-        List<Topic> sdkTopics = groups.stream()
-                .map(info -> {
-                    String namespace = info.getMqResource();
-                    String topic = allStreams.get(info.getGroupId()).getMqResource();
+        List<Topic> sdkTopics = sinks.stream()
+                .map(sink -> {
+                    String groupId = sink.getGroupId();
+                    String streamId = sink.getStreamId();
+                    SortSourceGroupInfo groupInfo = groupInfos.get(groupId);
+                    SortSourceStreamInfo streamInfo = allStreams.get(groupId).get(streamId);
+
+                    String namespace = groupInfo.getMqResource();
+                    String topic = streamInfo.getMqResource();
                     if (isBackupTag) {
-                        namespace = Optional.ofNullable(backupGroupMqResource.get(info.getGroupId())).orElse(namespace);
-                        topic = Optional.ofNullable(backupStreamMqResource.get(info.getGroupId())).orElse(topic);
+                        if (backupGroupMqResource.containsKey(groupId)) {
+                            namespace = backupGroupMqResource.get(groupId);
+                        }
+                        if (backupStreamMqResource.containsKey(groupId)
+                                && backupStreamMqResource.get(groupId).containsKey(streamId)) {
+                            topic = backupStreamMqResource.get(groupId).get(streamId);
+                        }
                     }
                     String fullTopic = tenant.concat("/").concat(namespace).concat("/").concat(topic);
                     return Topic.builder()
                             .topic(fullTopic)
-                            .topicProperties(info.getExtParamsMap())
+                            .topicProperties(streamInfo.getExtParamsMap())
                             .build();
                 })
                 .collect(Collectors.toList());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index 338ee3276..d52b47486 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -69,10 +69,12 @@ public class SortServiceImplTest extends ServiceBaseTest {
     private static final String TEST_CLUSTER = "testCluster";
     private static final String TEST_TASK = "testTask";
     private static final String TEST_GROUP = "testGroup";
-    private static final String TEST_STREAM = "1";
+    private static final String TEST_STREAM_1 = "1";
+    private static final String TEST_STREAM_2 = "2";
     private static final String TEST_TAG = "testTag";
     private static final String BACK_UP_TAG = "testBackupTag";
-    private static final String TEST_TOPIC = "testTopic";
+    private static final String TEST_TOPIC_1 = "testTopic";
+    private static final String TEST_TOPIC_2 = "testTopic2";
     private static final String TEST_SINK_TYPE = "testSinkType";
     private static final String TEST_CREATOR = "testUser";
 
@@ -204,11 +206,13 @@ public class SortServiceImplTest extends ServiceBaseTest {
     private void prepareAll() {
         this.prepareCluster(TEST_CLUSTER);
         this.preparePulsar("testPulsar", true, TEST_TAG);
-        this.preparePulsar("testPulsar2", true, BACK_UP_TAG);
+        this.preparePulsar("backupPulsar", true, BACK_UP_TAG);
         this.prepareDataNode(TEST_TASK);
         this.prepareGroupId(TEST_GROUP);
-        this.prepareStreamId(TEST_GROUP, TEST_STREAM);
-        this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER);
+        this.prepareStreamId(TEST_GROUP, TEST_STREAM_1, TEST_TOPIC_1);
+        this.prepareStreamId(TEST_GROUP, TEST_STREAM_2, TEST_TOPIC_2);
+        this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_1);
+        this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_2);
     }
 
     private void prepareDataNode(String taskName) {
@@ -255,12 +259,12 @@ public class SortServiceImplTest extends ServiceBaseTest {
         groupService.save(request, "test operator");
     }
 
-    private void prepareStreamId(String groupId, String streamId) {
+    private void prepareStreamId(String groupId, String streamId, String topic) {
         InlongStreamRequest request = new InlongStreamRequest();
         request.setInlongGroupId(groupId);
         request.setInlongStreamId(streamId);
         request.setName("test_stream_name");
-        request.setMqResource(TEST_TOPIC);
+        request.setMqResource(topic);
         request.setVersion(InlongConstants.INITIAL_VERSION);
         List<InlongStreamExtInfo> extInfos = new ArrayList<>();
         InlongStreamExtInfo ext = new InlongStreamExtInfo();
@@ -268,7 +272,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
         ext.setInlongStreamId(streamId);
         ext.setInlongGroupId(groupId);
         ext.setKeyName(ClusterSwitch.BACKUP_MQ_RESOURCE);
-        ext.setKeyValue("backup_topic");
+        ext.setKeyValue("backup_" + topic);
         request.setExtList(extInfos);
         streamService.save(request, "test_operator");
     }
@@ -305,7 +309,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
         clusterService.save(request, "test operator");
     }
 
-    private void prepareTask(String taskName, String groupId, String clusterName) {
+    private void prepareTask(String taskName, String groupId, String clusterName, String streamId) {
         SinkRequest request = new HiveSinkRequest();
         request.setDataNodeName(taskName);
         request.setSinkType(SinkType.HIVE);
@@ -313,7 +317,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
         request.setSinkName(taskName);
         request.setSortTaskName(taskName);
         request.setInlongGroupId(groupId);
-        request.setInlongStreamId("1");
+        request.setInlongStreamId(streamId);
         Map<String, Object> properties = new HashMap<>();
         properties.put("delimiter", "|");
         properties.put("dataType", "text");