You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 11:20:57 UTC
[inlong] branch master updated: [INLONG-6426][Manager] SortSourceService support multi stream under one group (#6427)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 69ed8d0bb [INLONG-6426][Manager] SortSourceService support multi stream under one group (#6427)
69ed8d0bb is described below
commit 69ed8d0bbad4bb2226a5856c4b75d392300d700d
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Tue Nov 8 19:20:52 2022 +0800
[INLONG-6426][Manager] SortSourceService support multi stream under one group (#6427)
---
.../resources/mappers/InlongStreamEntityMapper.xml | 3 +-
.../resources/mappers/StreamSinkEntityMapper.xml | 1 +
.../pojo/sort/standalone/SortSourceStreamInfo.java | 23 ++++++
.../sort/standalone/SortSourceStreamSinkInfo.java | 1 +
.../service/core/impl/SortSourceServiceImpl.java | 96 +++++++++++++---------
.../manager/service/sort/SortServiceImplTest.java | 24 +++---
6 files changed, 97 insertions(+), 51 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
index 231769ccc..9fec7e7cd 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
@@ -289,7 +289,8 @@
select
inlong_group_id,
inlong_stream_id,
- mq_resource
+ mq_resource,
+ ext_params
from inlong_stream
where is_deleted = 0
</select>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index f6422643c..996c85cf4 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -372,6 +372,7 @@
select inlong_cluster_name as sortClusterName,
sort_task_name,
inlong_group_id as groupId,
+ inlong_stream_id as streamId,
ext_params
from stream_sink
where is_deleted = 0
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java
index bd89a573c..294212c27 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java
@@ -17,11 +17,34 @@
package org.apache.inlong.manager.pojo.sort.standalone;
+import com.google.gson.Gson;
import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
@Data
public class SortSourceStreamInfo {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SortSourceStreamInfo.class);
+ private static final Gson GSON = new Gson();
+
private String inlongGroupId;
private String inlongStreamId;
private String mqResource;
+ String extParams;
+ Map<String, String> extParamsMap = new ConcurrentHashMap<>();
+
+ public Map<String, String> getExtParamsMap() {
+ if (extParamsMap.isEmpty() && StringUtils.isNotBlank(extParams)) {
+ try {
+ extParamsMap = GSON.fromJson(extParams, Map.class);
+ } catch (Throwable t) {
+ LOGGER.error("fail to parse group ext params", t);
+ }
+ }
+ return extParamsMap;
+ }
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java
index d144b2574..917494252 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java
@@ -38,6 +38,7 @@ public class SortSourceStreamSinkInfo {
String sortClusterName;
String sortTaskName;
String groupId;
+ String streamId;
String extParams;
Map<String, String> extParamsMap;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 4ec62c36d..df92a55d9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -50,7 +50,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -95,11 +94,11 @@ public class SortSourceServiceImpl implements SortSourceService {
private Map<String, List<SortSourceClusterInfo>> mqClusters;
private Map<String, SortSourceGroupInfo> groupInfos;
- private Map<String, SortSourceStreamInfo> allStreams;
+ private Map<String, Map<String, SortSourceStreamInfo>> allStreams;
private Map<String, String> backupClusterTag;
private Map<String, String> backupGroupMqResource;
- private Map<String, String> backupStreamMqResource;
- private Map<String, Map<String, List<String>>> groupMap;
+ private Map<String, Map<String, String>> backupStreamMqResource;
+ private Map<String, Map<String, List<SortSourceStreamSinkInfo>>> streamSinkMap;
@Autowired
private SortConfigLoader configLoader;
@@ -195,16 +194,16 @@ public class SortSourceServiceImpl implements SortSourceService {
// reload all stream sinks, to Map<clusterName, Map<taskName, List<groupId>>> format
List<SortSourceStreamSinkInfo> allStreamSinks = configLoader.loadAllStreamSinks();
- groupMap = new HashMap<>();
+ streamSinkMap = new HashMap<>();
allStreamSinks.stream()
.filter(sink -> sink.getSortClusterName() != null)
.filter(sink -> sink.getSortTaskName() != null)
.forEach(sink -> {
- Map<String, List<String>> task2groupsMap =
- groupMap.computeIfAbsent(sink.getSortClusterName(), k -> new ConcurrentHashMap<>());
- List<String> groupIdList =
+ Map<String, List<SortSourceStreamSinkInfo>> task2groupsMap =
+ streamSinkMap.computeIfAbsent(sink.getSortClusterName(), k -> new ConcurrentHashMap<>());
+ List<SortSourceStreamSinkInfo> sinkInfoList =
task2groupsMap.computeIfAbsent(sink.getSortTaskName(), k -> new ArrayList<>());
- groupIdList.add(sink.getGroupId());
+ sinkInfoList.add(sink);
});
// reload all groups
@@ -225,12 +224,15 @@ public class SortSourceServiceImpl implements SortSourceService {
// reload all streams
allStreams = configLoader.loadAllStreams()
.stream()
- .collect(Collectors.toMap(SortSourceStreamInfo::getInlongGroupId, stream -> stream));
+ .collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId,
+ Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));
// reload all back up stream mq resource
backupStreamMqResource = configLoader.loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE)
.stream()
- .collect(Collectors.toMap(InlongStreamExtEntity::getInlongGroupId, InlongStreamExtEntity::getKeyValue));
+ .collect(Collectors.groupingBy(InlongStreamExtEntity::getInlongGroupId,
+ Collectors.toMap(InlongStreamExtEntity::getInlongStreamId,
+ InlongStreamExtEntity::getKeyValue)));
}
private void parseAll() {
@@ -239,12 +241,12 @@ public class SortSourceServiceImpl implements SortSourceService {
Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
Map<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<>();
- groupMap.forEach((sortClusterName, task2GroupList) -> {
+ streamSinkMap.forEach((sortClusterName, task2SinkList) -> {
// prepare the new config and md5
Map<String, CacheZoneConfig> task2Config = new ConcurrentHashMap<>();
Map<String, String> task2Md5 = new ConcurrentHashMap<>();
- task2GroupList.forEach((taskName, groupList) -> {
+ task2SinkList.forEach((taskName, sinkList) -> {
try {
CacheZoneConfig cacheZoneConfig =
CacheZoneConfig.builder()
@@ -252,7 +254,7 @@ public class SortSourceServiceImpl implements SortSourceService {
.sortTaskId(taskName)
.build();
Map<String, CacheZone> cacheZoneMap =
- this.parseCacheZones(sortClusterName, taskName, groupList);
+ this.parseCacheZones(sinkList);
cacheZoneConfig.setCacheZones(cacheZoneMap);
// prepare md5
@@ -277,31 +279,33 @@ public class SortSourceServiceImpl implements SortSourceService {
backupClusterTag = null;
backupGroupMqResource = null;
backupStreamMqResource = null;
- groupMap = null;
+ streamSinkMap = null;
}
private Map<String, CacheZone> parseCacheZones(
- String sortClusterName,
- String taskName,
- List<String> groupIdList) {
+ List<SortSourceStreamSinkInfo> sinkList) {
// get group infos
- List<SortSourceGroupInfo> groupInfoList = groupIdList.stream()
- .filter(groupInfos::containsKey)
- .map(groupInfos::get)
+ List<SortSourceStreamSinkInfo> sinkInfoList = sinkList.stream()
+ .filter(sinkInfo -> groupInfos.containsKey(sinkInfo.getGroupId())
+ && allStreams.containsKey(sinkInfo.getGroupId())
+ && allStreams.get(sinkInfo.getGroupId()).containsKey(sinkInfo.getStreamId()))
.collect(Collectors.toList());
// group them by cluster tag.
- Map<String, List<SortSourceGroupInfo>> tag2GroupInfos = groupInfoList.stream()
- .collect(Collectors.groupingBy(SortSourceGroupInfo::getClusterTag));
+ Map<String, List<SortSourceStreamSinkInfo>> tag2SinkInfos = sinkInfoList.stream()
+ .collect(Collectors.groupingBy(sink -> {
+ SortSourceGroupInfo groupInfo = groupInfos.get(sink.getGroupId());
+ return groupInfo.getClusterTag();
+ }));
// group them by second cluster tag.
- Map<String, List<SortSourceGroupInfo>> backupTag2GroupInfos = groupInfoList.stream()
+ Map<String, List<SortSourceStreamSinkInfo>> backupTag2SinkInfos = sinkInfoList.stream()
.filter(info -> backupClusterTag.containsKey(info.getGroupId()))
.collect(Collectors.groupingBy(info -> backupClusterTag.get(info.getGroupId())));
- List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2GroupInfos, false);
- List<CacheZone> backupCacheZones = this.parseCacheZonesByTag(backupTag2GroupInfos, true);
+ List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2SinkInfos, false);
+ List<CacheZone> backupCacheZones = this.parseCacheZonesByTag(backupTag2SinkInfos, true);
return Stream.of(cacheZones, backupCacheZones)
.flatMap(Collection::stream)
@@ -315,18 +319,20 @@ public class SortSourceServiceImpl implements SortSourceService {
);
}
- private List<CacheZone> parseCacheZonesByTag(Map<String, List<SortSourceGroupInfo>> tag2Groups, boolean isBackup) {
+ private List<CacheZone> parseCacheZonesByTag(
+ Map<String, List<SortSourceStreamSinkInfo>> tag2Sinks,
+ boolean isBackup) {
- return tag2Groups.keySet().stream()
+ return tag2Sinks.keySet().stream()
.filter(mqClusters::containsKey)
.flatMap(tag -> {
- List<SortSourceGroupInfo> groups = tag2Groups.get(tag);
+ List<SortSourceStreamSinkInfo> sinks = tag2Sinks.get(tag);
List<SortSourceClusterInfo> clusters = mqClusters.get(tag);
return clusters.stream()
.map(cluster -> {
CacheZone zone = null;
try {
- zone = this.parseCacheZone(groups, cluster, isBackup);
+ zone = this.parseCacheZone(sinks, cluster, isBackup);
} catch (IllegalStateException e) {
LOGGER.error("fail to init cache zone for cluster " + cluster, e);
}
@@ -337,11 +343,11 @@ public class SortSourceServiceImpl implements SortSourceService {
}
private CacheZone parseCacheZone(
- List<SortSourceGroupInfo> groups,
+ List<SortSourceStreamSinkInfo> sinks,
SortSourceClusterInfo cluster,
boolean isBackupTag) {
switch (cluster.getType()) {
- case ClusterType.PULSAR: return parsePulsarZone(groups, cluster, isBackupTag);
+ case ClusterType.PULSAR: return parsePulsarZone(sinks, cluster, isBackupTag);
default:
throw new BusinessException(String.format("do not support cluster type=%s of cluster=%s",
cluster.getType(), cluster));
@@ -349,24 +355,34 @@ public class SortSourceServiceImpl implements SortSourceService {
}
private CacheZone parsePulsarZone(
- List<SortSourceGroupInfo> groups,
+ List<SortSourceStreamSinkInfo> sinks,
SortSourceClusterInfo cluster,
boolean isBackupTag) {
Map<String, String> param = cluster.getExtParamsMap();
String tenant = param.get(KEY_TENANT);
String auth = param.get(KEY_AUTH);
- List<Topic> sdkTopics = groups.stream()
- .map(info -> {
- String namespace = info.getMqResource();
- String topic = allStreams.get(info.getGroupId()).getMqResource();
+ List<Topic> sdkTopics = sinks.stream()
+ .map(sink -> {
+ String groupId = sink.getGroupId();
+ String streamId = sink.getStreamId();
+ SortSourceGroupInfo groupInfo = groupInfos.get(groupId);
+ SortSourceStreamInfo streamInfo = allStreams.get(groupId).get(streamId);
+
+ String namespace = groupInfo.getMqResource();
+ String topic = streamInfo.getMqResource();
if (isBackupTag) {
- namespace = Optional.ofNullable(backupGroupMqResource.get(info.getGroupId())).orElse(namespace);
- topic = Optional.ofNullable(backupStreamMqResource.get(info.getGroupId())).orElse(topic);
+ if (backupGroupMqResource.containsKey(groupId)) {
+ namespace = backupGroupMqResource.get(groupId);
+ }
+ if (backupStreamMqResource.containsKey(groupId)
+ && backupStreamMqResource.get(groupId).containsKey(streamId)) {
+ topic = backupStreamMqResource.get(groupId).get(streamId);
+ }
}
String fullTopic = tenant.concat("/").concat(namespace).concat("/").concat(topic);
return Topic.builder()
.topic(fullTopic)
- .topicProperties(info.getExtParamsMap())
+ .topicProperties(streamInfo.getExtParamsMap())
.build();
})
.collect(Collectors.toList());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index 338ee3276..d52b47486 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -69,10 +69,12 @@ public class SortServiceImplTest extends ServiceBaseTest {
private static final String TEST_CLUSTER = "testCluster";
private static final String TEST_TASK = "testTask";
private static final String TEST_GROUP = "testGroup";
- private static final String TEST_STREAM = "1";
+ private static final String TEST_STREAM_1 = "1";
+ private static final String TEST_STREAM_2 = "2";
private static final String TEST_TAG = "testTag";
private static final String BACK_UP_TAG = "testBackupTag";
- private static final String TEST_TOPIC = "testTopic";
+ private static final String TEST_TOPIC_1 = "testTopic";
+ private static final String TEST_TOPIC_2 = "testTopic2";
private static final String TEST_SINK_TYPE = "testSinkType";
private static final String TEST_CREATOR = "testUser";
@@ -204,11 +206,13 @@ public class SortServiceImplTest extends ServiceBaseTest {
private void prepareAll() {
this.prepareCluster(TEST_CLUSTER);
this.preparePulsar("testPulsar", true, TEST_TAG);
- this.preparePulsar("testPulsar2", true, BACK_UP_TAG);
+ this.preparePulsar("backupPulsar", true, BACK_UP_TAG);
this.prepareDataNode(TEST_TASK);
this.prepareGroupId(TEST_GROUP);
- this.prepareStreamId(TEST_GROUP, TEST_STREAM);
- this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER);
+ this.prepareStreamId(TEST_GROUP, TEST_STREAM_1, TEST_TOPIC_1);
+ this.prepareStreamId(TEST_GROUP, TEST_STREAM_2, TEST_TOPIC_2);
+ this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_1);
+ this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_2);
}
private void prepareDataNode(String taskName) {
@@ -255,12 +259,12 @@ public class SortServiceImplTest extends ServiceBaseTest {
groupService.save(request, "test operator");
}
- private void prepareStreamId(String groupId, String streamId) {
+ private void prepareStreamId(String groupId, String streamId, String topic) {
InlongStreamRequest request = new InlongStreamRequest();
request.setInlongGroupId(groupId);
request.setInlongStreamId(streamId);
request.setName("test_stream_name");
- request.setMqResource(TEST_TOPIC);
+ request.setMqResource(topic);
request.setVersion(InlongConstants.INITIAL_VERSION);
List<InlongStreamExtInfo> extInfos = new ArrayList<>();
InlongStreamExtInfo ext = new InlongStreamExtInfo();
@@ -268,7 +272,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
ext.setInlongStreamId(streamId);
ext.setInlongGroupId(groupId);
ext.setKeyName(ClusterSwitch.BACKUP_MQ_RESOURCE);
- ext.setKeyValue("backup_topic");
+ ext.setKeyValue("backup_" + topic);
request.setExtList(extInfos);
streamService.save(request, "test_operator");
}
@@ -305,7 +309,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
clusterService.save(request, "test operator");
}
- private void prepareTask(String taskName, String groupId, String clusterName) {
+ private void prepareTask(String taskName, String groupId, String clusterName, String streamId) {
SinkRequest request = new HiveSinkRequest();
request.setDataNodeName(taskName);
request.setSinkType(SinkType.HIVE);
@@ -313,7 +317,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
request.setSinkName(taskName);
request.setSortTaskName(taskName);
request.setInlongGroupId(groupId);
- request.setInlongStreamId("1");
+ request.setInlongStreamId(streamId);
Map<String, Object> properties = new HashMap<>();
properties.put("delimiter", "|");
properties.put("dataType", "text");