You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/30 04:39:27 UTC
[inlong] branch master updated: [INLONG-7254][Manager] Fix config error when InlongGroupId is in the process of switching (#7255)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c57b94279 [INLONG-7254][Manager] Fix config error when InlongGroupId is in the process of switching (#7255)
c57b94279 is described below
commit c57b9427931e293e8f1ac8d8efd9522cc0113eaa
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Mon Jan 30 12:39:21 2023 +0800
[INLONG-7254][Manager] Fix config error when InlongGroupId is in the process of switching (#7255)
---
.../service/core/impl/SortSourceServiceImpl.java | 26 +++++++-
.../manager/service/sort/SortServiceImplTest.java | 78 ++++++++++++++++++----
2 files changed, 88 insertions(+), 16 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index e52a9bfb4..dd772c18c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -28,6 +28,7 @@ import org.apache.inlong.common.pojo.sdk.Topic;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
@@ -94,6 +95,7 @@ public class SortSourceServiceImpl implements SortSourceService {
*/
private Map<String, Map<String, CacheZoneConfig>> sortSourceConfigMap = new ConcurrentHashMap<>();
+ private Map<String, SortSourceClusterInfo> sortClusters;
private Map<String, List<SortSourceClusterInfo>> mqClusters;
private Map<String, SortSourceGroupInfo> groupInfos;
private Map<String, Map<String, SortSourceStreamInfo>> allStreams;
@@ -187,6 +189,8 @@ public class SortSourceServiceImpl implements SortSourceService {
// reload mq cluster and sort cluster
List<SortSourceClusterInfo> allClusters = configLoader.loadAllClusters();
+ sortClusters = allClusters.stream()
+ .collect(Collectors.toMap(SortSourceClusterInfo::getName, v -> v));
// group mq clusters by cluster tag
mqClusters = allClusters.stream()
@@ -256,7 +260,7 @@ public class SortSourceServiceImpl implements SortSourceService {
.sortTaskId(taskName)
.build();
Map<String, CacheZone> cacheZoneMap =
- this.parseCacheZones(sinkList);
+ this.parseCacheZones(sortClusterName, sinkList);
cacheZoneConfig.setCacheZones(cacheZoneMap);
// prepare md5
@@ -275,6 +279,7 @@ public class SortSourceServiceImpl implements SortSourceService {
});
sortSourceConfigMap = newConfigMap;
sortSourceMd5Map = newMd5Map;
+ sortClusters = null;
mqClusters = null;
groupInfos = null;
allStreams = null;
@@ -285,8 +290,12 @@ public class SortSourceServiceImpl implements SortSourceService {
}
private Map<String, CacheZone> parseCacheZones(
+ String clusterName,
List<SortSourceStreamSinkInfo> sinkList) {
+ Preconditions.checkNotNull(sortClusters.get(clusterName), "sort cluster should not be NULL");
+ String sortClusterTag = sortClusters.get(clusterName).getClusterTags();
+
// get group infos
List<SortSourceStreamSinkInfo> sinkInfoList = sinkList.stream()
.filter(sinkInfo -> groupInfos.containsKey(sinkInfo.getGroupId())
@@ -296,6 +305,13 @@ public class SortSourceServiceImpl implements SortSourceService {
// group them by cluster tag.
Map<String, List<SortSourceStreamSinkInfo>> tag2SinkInfos = sinkInfoList.stream()
+ .filter(sink -> Objects.nonNull(groupInfos.get(sink.getGroupId())))
+ .filter(sink -> {
+ if (StringUtils.isBlank(sortClusterTag)) {
+ return true;
+ }
+ return sortClusterTag.equals(groupInfos.get(sink.getGroupId()).getClusterTag());
+ })
.collect(Collectors.groupingBy(sink -> {
SortSourceGroupInfo groupInfo = groupInfos.get(sink.getGroupId());
return groupInfo.getClusterTag();
@@ -303,7 +319,13 @@ public class SortSourceServiceImpl implements SortSourceService {
// group them by second cluster tag.
Map<String, List<SortSourceStreamSinkInfo>> backupTag2SinkInfos = sinkInfoList.stream()
- .filter(info -> backupClusterTag.containsKey(info.getGroupId()))
+ .filter(sink -> backupClusterTag.containsKey(sink.getGroupId()))
+ .filter(sink -> {
+ if (StringUtils.isBlank(sortClusterTag)) {
+ return true;
+ }
+ return sortClusterTag.equals(backupClusterTag.get(sink.getGroupId()));
+ })
.collect(Collectors.groupingBy(info -> backupClusterTag.get(info.getGroupId())));
List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2SinkInfos, false);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index 9d4d417af..17ee08065 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -18,6 +18,8 @@
package org.apache.inlong.manager.service.sort;
import org.apache.inlong.common.constant.ClusterSwitch;
+import org.apache.inlong.common.pojo.sdk.CacheZone;
+import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.manager.common.consts.DataNodeType;
@@ -67,8 +69,12 @@ import java.util.Map;
public class SortServiceImplTest extends ServiceBaseTest {
private static final String TEST_GROUP = "test-group";
- private static final String TEST_CLUSTER = "test-cluster";
- private static final String TEST_TASK = "test-task";
+ private static final String TEST_CLUSTER_1 = "test-cluster-1";
+ private static final String TEST_CLUSTER_2 = "test-cluster-2";
+ private static final String TEST_CLUSTER_3 = "test-cluster-3";
+ private static final String TEST_TASK_1 = "test-task-1";
+ private static final String TEST_TASK_2 = "test-task-2";
+ private static final String TEST_TASK_3 = "test-task-3";
private static final String TEST_STREAM_1 = "1";
private static final String TEST_STREAM_2 = "2";
private static final String TEST_TAG = "testTag";
@@ -114,8 +120,8 @@ public class SortServiceImplTest extends ServiceBaseTest {
@Test
@Order(2)
@Transactional
- public void testSourceCorrectParams() {
- SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER, TEST_TASK, "");
+ public void testSourceCorrectParamsOfNoTagSortCluster() {
+ SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, "");
JSONObject jo = new JSONObject(response);
System.out.println(jo);
Assertions.assertEquals(0, response.getCode());
@@ -128,9 +134,9 @@ public class SortServiceImplTest extends ServiceBaseTest {
@Order(3)
@Transactional
public void testSourceSameMd5() {
- SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER, TEST_TASK, "");
+ SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, "");
String md5 = response.getMd5();
- response = sortService.getSourceConfig(TEST_CLUSTER, TEST_TASK, md5);
+ response = sortService.getSourceConfig(TEST_CLUSTER_1, TEST_TASK_1, md5);
System.out.println(response);
Assertions.assertEquals(1, response.getCode());
Assertions.assertEquals(md5, response.getMd5());
@@ -167,7 +173,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
@Order(6)
@Transactional
public void testClusterCorrectParams() {
- SortClusterResponse response = sortService.getClusterConfig(TEST_CLUSTER, "");
+ SortClusterResponse response = sortService.getClusterConfig(TEST_CLUSTER_1, "");
JSONObject jo = new JSONObject(response);
System.out.println(jo);
Assertions.assertEquals(0, response.getCode());
@@ -180,9 +186,9 @@ public class SortServiceImplTest extends ServiceBaseTest {
@Order(7)
@Transactional
public void testClusterSameMd5() {
- SortClusterResponse response = sortService.getClusterConfig(TEST_CLUSTER, "");
+ SortClusterResponse response = sortService.getClusterConfig(TEST_CLUSTER_1, "");
String md5 = response.getMd5();
- response = sortService.getClusterConfig(TEST_CLUSTER, md5);
+ response = sortService.getClusterConfig(TEST_CLUSTER_1, md5);
System.out.println(response);
Assertions.assertEquals(1, response.getCode());
Assertions.assertEquals(md5, response.getMd5());
@@ -202,17 +208,60 @@ public class SortServiceImplTest extends ServiceBaseTest {
Assertions.assertNotNull(response.getMsg());
}
+ @Test
+ @Order(9)
+ @Transactional
+ public void testSourceCorrectParamsOfTaggedSortCluster() {
+ SortSourceConfigResponse response = sortService.getSourceConfig(TEST_CLUSTER_2, TEST_TASK_2, "");
+ JSONObject jo = new JSONObject(response);
+ System.out.println(jo);
+ Assertions.assertEquals(0, response.getCode());
+ Assertions.assertNotNull(response.getMd5());
+ Assertions.assertNotNull(response.getMsg());
+ CacheZoneConfig config = response.getData();
+ Assertions.assertNotNull(config);
+ Assertions.assertEquals(TEST_CLUSTER_2, config.getSortClusterName());
+ Assertions.assertEquals(TEST_TASK_2, config.getSortTaskId());
+ Assertions.assertEquals(1, config.getCacheZones().size());
+ CacheZone zone = config.getCacheZones().get("testPulsar");
+ Assertions.assertNotNull(zone);
+ Assertions.assertEquals("testPulsar", zone.getZoneName());
+ Assertions.assertEquals(1, zone.getTopics().size());
+
+ response = sortService.getSourceConfig(TEST_CLUSTER_3, TEST_TASK_3, "");
+ jo = new JSONObject(response);
+ System.out.println(jo);
+ Assertions.assertEquals(0, response.getCode());
+ Assertions.assertNotNull(response.getMd5());
+ Assertions.assertNotNull(response.getMsg());
+ config = response.getData();
+ Assertions.assertNotNull(config);
+ Assertions.assertEquals(TEST_CLUSTER_3, config.getSortClusterName());
+ Assertions.assertEquals(TEST_TASK_3, config.getSortTaskId());
+ Assertions.assertEquals(1, config.getCacheZones().size());
+ zone = config.getCacheZones().get("backupPulsar");
+ Assertions.assertNotNull(zone);
+ Assertions.assertEquals("backupPulsar", zone.getZoneName());
+ Assertions.assertEquals(1, zone.getTopics().size());
+ }
+
@BeforeEach
private void prepareAll() {
- this.prepareCluster(TEST_CLUSTER);
+ this.prepareCluster(TEST_CLUSTER_1, null);
+ this.prepareCluster(TEST_CLUSTER_2, TEST_TAG);
+ this.prepareCluster(TEST_CLUSTER_3, BACK_UP_TAG);
this.preparePulsar("testPulsar", true, TEST_TAG);
this.preparePulsar("backupPulsar", true, BACK_UP_TAG);
- this.prepareDataNode(TEST_TASK);
+ this.prepareDataNode(TEST_TASK_1);
+ this.prepareDataNode(TEST_TASK_2);
+ this.prepareDataNode(TEST_TASK_3);
this.prepareGroupId(TEST_GROUP);
this.prepareStreamId(TEST_GROUP, TEST_STREAM_1, TEST_TOPIC_1);
this.prepareStreamId(TEST_GROUP, TEST_STREAM_2, TEST_TOPIC_2);
- this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_1);
- this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_2);
+ this.prepareTask(TEST_TASK_1, TEST_GROUP, TEST_CLUSTER_1, TEST_STREAM_1);
+ this.prepareTask(TEST_TASK_1, TEST_GROUP, TEST_CLUSTER_1, TEST_STREAM_2);
+ this.prepareTask(TEST_TASK_2, TEST_GROUP, TEST_CLUSTER_2, TEST_STREAM_1);
+ this.prepareTask(TEST_TASK_3, TEST_GROUP, TEST_CLUSTER_3, TEST_STREAM_2);
}
private void prepareDataNode(String taskName) {
@@ -277,13 +326,14 @@ public class SortServiceImplTest extends ServiceBaseTest {
streamService.save(request, "test_operator");
}
- private void prepareCluster(String clusterName) {
+ private void prepareCluster(String clusterName, String clusterTag) {
InlongClusterEntity entity = new InlongClusterEntity();
entity.setName(clusterName);
entity.setType(DataNodeType.HIVE);
entity.setExtParams("{}");
entity.setCreator(TEST_CREATOR);
entity.setInCharges(TEST_CREATOR);
+ entity.setClusterTags(clusterTag);
Date now = new Date();
entity.setCreateTime(now);
entity.setModifyTime(now);