You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/15 11:05:16 UTC
[inlong] branch master updated: [INLONG-5030][Manager] Use SortConfigListener replace the LightGroupSortListener (#5032)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 926efc66d [INLONG-5030][Manager] Use SortConfigListener replace the LightGroupSortListener (#5032)
926efc66d is described below
commit 926efc66db7e1cb6398936cd1e781de563d587c5
Author: healchow <he...@gmail.com>
AuthorDate: Fri Jul 15 19:05:10 2022 +0800
[INLONG-5030][Manager] Use SortConfigListener replace the LightGroupSortListener (#5032)
---
.../common/pojo/source/tubemq/TubeMQSource.java | 2 +-
.../form/process/GroupResourceProcessForm.java | 21 +--
.../manager/service/sink/StreamSinkService.java | 12 ++
.../service/sink/StreamSinkServiceImpl.java | 18 ++
...perator.java => DefaultSortConfigOperator.java} | 189 ++++++++-------------
.../manager/service/sort/SortConfigListener.java | 3 +-
.../manager/service/sort/SortConfigOperator.java | 3 +-
.../service/sort/SortConfigOperatorFactory.java | 9 +-
.../service/sort/StreamSortConfigListener.java | 5 +-
.../service/sort/light/LightGroupSortListener.java | 152 -----------------
.../service/sort/light/LightGroupSortSelector.java | 51 ------
.../service/source/AbstractSourceOperator.java | 14 +-
.../service/source/StreamSourceOperator.java | 18 ++
.../service/source/StreamSourceService.java | 15 ++
.../service/source/StreamSourceServiceImpl.java | 55 ++++--
.../source/pulsar/PulsarSourceOperator.java | 64 +++++++
.../source/tubemq/TubeMQSourceOperator.java | 60 +++++--
.../listener/GroupTaskListenerFactory.java | 7 -
18 files changed, 309 insertions(+), 389 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSource.java
index 2458a5509..74d85d09d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSource.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSource.java
@@ -58,7 +58,7 @@ public class TubeMQSource extends StreamSource {
private String sessionKey;
/**
- * The tubemq consumers use this tid set to filter records reading from server.
+ * The TubeMQ consumers use this tid set to filter records reading from server.
*/
@ApiModelProperty("Tid of the TubeMQ")
private TreeSet<String> tid;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/GroupResourceProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/GroupResourceProcessForm.java
index 03c9e15cb..61547477e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/GroupResourceProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/GroupResourceProcessForm.java
@@ -17,16 +17,15 @@
package org.apache.inlong.manager.common.pojo.workflow.form.process;
-import com.google.common.collect.Maps;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.exceptions.FormValidateException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.util.Preconditions;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -41,22 +40,13 @@ public class GroupResourceProcessForm extends BaseProcessForm {
private InlongGroupInfo groupInfo;
- @Getter
- @Setter
- private GroupOperateType groupOperateType = GroupOperateType.INIT;
-
private List<InlongStreamInfo> streamInfos;
- public InlongGroupInfo getGroupInfo() {
- return groupInfo;
- }
-
- public void setGroupInfo(InlongGroupInfo groupInfo) {
- this.groupInfo = groupInfo;
- }
+ private GroupOperateType groupOperateType = GroupOperateType.INIT;
@Override
public void validate() throws FormValidateException {
+ Preconditions.checkNotNull(groupInfo, "InlongGroupInfo cannot be null");
}
@Override
@@ -71,9 +61,10 @@ public class GroupResourceProcessForm extends BaseProcessForm {
@Override
public Map<String, Object> showInList() {
- Map<String, Object> show = Maps.newHashMap();
+ Map<String, Object> show = new HashMap<>();
show.put("inlongGroupId", groupInfo.getInlongGroupId());
show.put("groupOperateType", this.groupOperateType);
return show;
}
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index c141a0aea..aad07b29f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -18,13 +18,16 @@
package org.apache.inlong.manager.service.sink;
import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import java.util.List;
+import java.util.Map;
/**
* Service layer interface for stream sink
@@ -66,6 +69,15 @@ public interface StreamSinkService {
*/
List<SinkBriefResponse> listBrief(String groupId, String streamId);
+ /**
+ * Get the StreamSink Map by the inlong group info and inlong stream info list.
+ *
+ * @param groupInfo inlong group info
+ * @param streamInfos inlong stream info list
+ * @return map of StreamSink list, key-inlongStreamId, value-StreamSinkList
+ */
+ Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos);
+
/**
* Query the number of undeleted sink info based on inlong group and inlong stream id
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 9327c26d7..cd9c64337 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -30,12 +30,14 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
@@ -53,9 +55,11 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* Implementation of sink service interface
@@ -161,6 +165,20 @@ public class StreamSinkServiceImpl implements StreamSinkService {
return summaryList;
}
+ @Override
+ public Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos) {
+ String groupId = groupInfo.getInlongGroupId();
+ LOGGER.debug("begin to get sink map for groupId={}", groupId);
+
+ List<StreamSink> streamSinks = this.listSink(groupId, null);
+ Map<String, List<StreamSink>> result = streamSinks.stream()
+ .collect(Collectors.groupingBy(StreamSink::getInlongStreamId, HashMap::new,
+ Collectors.toCollection(ArrayList::new)));
+
+ LOGGER.debug("success to get sink map, size={}, groupInfo={}", result.size(), groupInfo);
+ return result;
+ }
+
@Override
public PageInfo<? extends StreamSink> listByCondition(SinkPageRequest request) {
Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfig4NormalGroupOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
similarity index 50%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfig4NormalGroupOperator.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
index 95674cee9..cf77c6c28 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfig4NormalGroupOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
@@ -18,35 +18,25 @@
package org.apache.inlong.manager.service.sort;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.enums.SourceType;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
-import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
-import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
-import org.apache.inlong.manager.common.pojo.source.tubemq.TubeMQSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
+import org.apache.inlong.manager.service.sort.util.NodeRelationUtils;
+import org.apache.inlong.manager.service.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.service.transform.StreamTransformService;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
import org.slf4j.Logger;
@@ -61,25 +51,24 @@ import java.util.Map;
import java.util.stream.Collectors;
/**
- * Sort config operator, used to create a Sort config for the InlongGroup in normal mode with ZK disabled.
+ * Default Sort config operator, used to create a Sort config for the InlongGroup with ZK disabled.
*/
@Service
-public class SortConfig4NormalGroupOperator implements SortConfigOperator {
+public class DefaultSortConfigOperator implements SortConfigOperator {
- private static final Logger LOGGER = LoggerFactory.getLogger(SortConfig4NormalGroupOperator.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSortConfigOperator.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Autowired
private StreamSourceService sourceService;
@Autowired
- private StreamSinkService sinkService;
+ private StreamTransformService transformService;
@Autowired
- private InlongClusterService clusterService;
+ private StreamSinkService sinkService;
@Override
- public Boolean accept(Integer isNormal, Integer enableZk) {
- return InlongConstants.NORMAL_MODE.equals(isNormal)
- && InlongConstants.DISABLE_ZK.equals(enableZk);
+ public Boolean accept(Integer enableZk) {
+ return InlongConstants.DISABLE_ZK.equals(enableZk);
}
@Override
@@ -90,32 +79,67 @@ public class SortConfig4NormalGroupOperator implements SortConfigOperator {
return;
}
- GroupInfo configInfo = this.createSortGroupInfo(groupInfo, streamInfos);
- String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
+ GroupInfo configInfo;
+ // if the mode of inlong group is LIGHTWEIGHT, means not using any MQ as a cached source
+ if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+ configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos);
+ } else {
+ configInfo = this.getNormalGroupInfo(groupInfo, streamInfos);
+ }
+ String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
if (isStream) {
this.addToStreamExt(streamInfos, dataflow);
} else {
this.addToGroupExt(groupInfo, dataflow);
}
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("success to build sort config, isStream={}, dataflow={}", isStream, dataflow);
+ }
+ }
+
+ private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+ // get source info
+ Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
+ // get sink info
+ String groupId = groupInfo.getInlongGroupId();
+ Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
+
+ List<TransformResponse> transformResponses = transformService.listTransform(groupId, null);
+ Map<String, List<TransformResponse>> transformMap = transformResponses.stream()
+ .collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new,
+ Collectors.toCollection(ArrayList::new)));
+
+ List<StreamInfo> sortStreamInfos = new ArrayList<>();
+ for (InlongStreamInfo inlongStream : streamInfoList) {
+ String streamId = inlongStream.getInlongStreamId();
+
+ List<TransformResponse> transformResponseList = transformMap.get(streamId);
+ List<Node> nodes = this.createNodesForStream(sourceMap.get(streamId),
+ transformResponseList, sinkMap.get(streamId));
+ StreamInfo streamInfo = new StreamInfo(streamId, nodes,
+ NodeRelationUtils.createNodeRelationsForStream(inlongStream));
+ sortStreamInfos.add(streamInfo);
+
+ // rebuild joinerNode relation
+ NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList);
+ }
+
+ return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
}
/**
- * Create GroupInfo for Sort protocol.
+ * Get Sort GroupInfo of normal inlong group.
*
* @see org.apache.inlong.sort.protocol.GroupInfo
*/
- private GroupInfo createSortGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
- String groupId = groupInfo.getInlongGroupId();
- List<StreamSink> streamSinks = sinkService.listSink(groupId, null);
- Map<String, List<StreamSink>> sinkMap = streamSinks.stream()
- .collect(Collectors.groupingBy(StreamSink::getInlongStreamId, HashMap::new,
- Collectors.toCollection(ArrayList::new)));
-
+ private GroupInfo getNormalGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
// get source info
- Map<String, List<StreamSource>> sourceMap;
- MQType.forType(groupInfo.getMqType());
- sourceMap = createMQSources(groupInfo, streamInfoList);
+ Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
+ // get sink info
+ Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
+
// create StreamInfo for Sort protocol
List<StreamInfo> sortStreamInfos = new ArrayList<>();
for (InlongStreamInfo inlongStream : streamInfoList) {
@@ -128,91 +152,7 @@ public class SortConfig4NormalGroupOperator implements SortConfigOperator {
sortStreamInfos.add(sortStream);
}
- return new GroupInfo(groupId, sortStreamInfos);
- }
-
- private Map<String, List<StreamSource>> createMQSources(
- InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
- if (MQType.forType(groupInfo.getMqType()) == MQType.TUBE) {
- return createTubeSource(groupInfo, streamInfoList);
- }
- return createPulsarSource(groupInfo, streamInfoList);
- }
-
- /**
- * Create Tube sources for Sort.
- */
- private Map<String, List<StreamSource>> createTubeSource(
- InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
- ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null,
- ClusterType.TUBE);
- TubeClusterInfo tubeClusterInfo = (TubeClusterInfo) clusterInfo;
- String masterRpc = tubeClusterInfo.getUrl();
- Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
- streamInfoList.forEach(streamInfo -> {
- TubeMQSource tubeMQSource = new TubeMQSource();
- String streamId = streamInfo.getInlongStreamId();
- tubeMQSource.setSourceName(streamId);
- tubeMQSource.setTopic(streamInfo.getMqResource());
- tubeMQSource.setGroupId(streamId);
- tubeMQSource.setMasterRpc(masterRpc);
- List<StreamSource> sourceInfos = sourceService.listSource(groupInfo.getInlongGroupId(), streamId);
- for (StreamSource sourceInfo : sourceInfos) {
- tubeMQSource.setSerializationType(sourceInfo.getSerializationType());
- }
- tubeMQSource.setFieldList(streamInfo.getFieldList());
- sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(tubeMQSource);
- });
-
- return sourceMap;
- }
-
- /**
- * Create Pulsar sources for Sort.
- */
- private Map<String, List<StreamSource>> createPulsarSource(
- InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
- ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null,
- ClusterType.PULSAR);
- PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
- String adminUrl = pulsarCluster.getAdminUrl();
- String serviceUrl = pulsarCluster.getUrl();
- String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
- ? InlongConstants.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
-
- Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
- streamInfoList.forEach(streamInfo -> {
- PulsarSource pulsarSource = new PulsarSource();
- String streamId = streamInfo.getInlongStreamId();
- pulsarSource.setSourceName(streamId);
- pulsarSource.setTenant(tenant);
- pulsarSource.setNamespace(groupInfo.getMqResource());
- pulsarSource.setTopic(streamInfo.getMqResource());
- pulsarSource.setAdminUrl(adminUrl);
- pulsarSource.setServiceUrl(serviceUrl);
- pulsarSource.setInlongComponent(true);
-
- List<StreamSource> sourceInfos = sourceService.listSource(groupInfo.getInlongGroupId(), streamId);
- for (StreamSource sourceInfo : sourceInfos) {
- if (StringUtils.isEmpty(pulsarSource.getSerializationType())
- && StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
- pulsarSource.setSerializationType(sourceInfo.getSerializationType());
- }
- if (SourceType.forType(sourceInfo.getSourceType()) == SourceType.KAFKA) {
- pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
- }
- }
-
- // if the SerializationType is still null, set it to the CSV
- if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
- pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
- }
- pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
- pulsarSource.setFieldList(streamInfo.getFieldList());
- sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
- });
-
- return sourceMap;
+ return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
}
private List<Node> createNodesForStream(List<StreamSource> sources, List<StreamSink> streamSinks) {
@@ -222,6 +162,15 @@ public class SortConfig4NormalGroupOperator implements SortConfigOperator {
return nodes;
}
+ private List<Node> createNodesForStream(List<StreamSource> sourceInfos,
+ List<TransformResponse> transformResponses, List<StreamSink> streamSinks) {
+ List<Node> nodes = Lists.newArrayList();
+ nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceInfos));
+ nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses));
+ nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks));
+ return nodes;
+ }
+
private List<NodeRelation> createNodeRelationsForStream(List<StreamSource> sources, List<StreamSink> streamSinks) {
NodeRelation relation = new NodeRelation();
List<String> inputs = sources.stream().map(StreamSource::getSourceName).collect(Collectors.toList());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
index ffaeec151..14d8f3258 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
@@ -72,8 +72,7 @@ public class SortConfigListener implements SortOperateListener {
}
try {
- SortConfigOperator operator = operatorFactory.getInstance(groupInfo.getLightweight(),
- groupInfo.getEnableZookeeper());
+ SortConfigOperator operator = operatorFactory.getInstance(groupInfo.getEnableZookeeper());
operator.buildConfig(groupInfo, streamInfos, false);
} catch (Exception e) {
String msg = String.format("failed to build sort config for groupId=%s, ", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperator.java
index e6c35253f..f8c9de5b4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperator.java
@@ -30,10 +30,9 @@ public interface SortConfigOperator {
/**
* Determines whether the current instance matches the specified type.
*
- * @param isNormal is the inlong group is normal mode, 0: normal mode, 1: lightweight mode
* @param enableZk is the inlong group enable the ZooKeeper, 1: enable, 0: disable
*/
- Boolean accept(Integer isNormal, Integer enableZk);
+ Boolean accept(Integer enableZk);
/**
* Build Sort config.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperatorFactory.java
index 4c6c547b4..fcd8ed81d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperatorFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperatorFactory.java
@@ -35,15 +35,14 @@ public class SortConfigOperatorFactory {
/**
* Get a Sort config operator instance.
*
- * @param isNormal is the inlong group is normal mode, 0: normal mode, 1: lightweight mode
* @param enableZk is the inlong group enable the ZooKeeper, 1: enable, 0: disable
*/
- public SortConfigOperator getInstance(Integer isNormal, Integer enableZk) {
+ public SortConfigOperator getInstance(Integer enableZk) {
return operatorList.stream()
- .filter(inst -> inst.accept(isNormal, enableZk))
+ .filter(inst -> inst.accept(enableZk))
.findFirst()
- .orElseThrow(() -> new BusinessException("not found any instance of SortConfigOperator when isNormal="
- + isNormal + ", enableZk=" + enableZk));
+ .orElseThrow(() -> new BusinessException("not found any instance of SortConfigOperator when enableZk="
+ + enableZk));
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
index 17ddacf4d..dda1086fc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.service.sort;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -45,7 +44,6 @@ import java.util.List;
public class StreamSortConfigListener implements SortOperateListener {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamSortConfigListener.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
@Autowired
private SortConfigOperatorFactory operatorFactory;
@@ -79,8 +77,7 @@ public class StreamSortConfigListener implements SortOperateListener {
List<InlongStreamInfo> streamInfos = Collections.singletonList(streamInfo);
try {
- SortConfigOperator operator = operatorFactory.getInstance(groupInfo.getLightweight(),
- groupInfo.getEnableZookeeper());
+ SortConfigOperator operator = operatorFactory.getInstance(groupInfo.getEnableZookeeper());
operator.buildConfig(groupInfo, streamInfos, true);
} catch (Exception e) {
String msg = String.format("failed to build sort config for groupId=%s, streamId=%s, ", groupId, streamId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
deleted file mode 100644
index 90f116821..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.sort.light;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.sink.StreamSink;
-import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
-import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
-import org.apache.inlong.manager.service.sort.util.NodeRelationUtils;
-import org.apache.inlong.manager.service.sort.util.TransformNodeUtils;
-import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.transform.StreamTransformService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
-import org.apache.inlong.manager.workflow.event.task.TaskEvent;
-import org.apache.inlong.sort.protocol.GroupInfo;
-import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.node.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * Event listener of creat light group sort config.
- */
-@Component
-public class LightGroupSortListener implements SortOperateListener {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(LightGroupSortListener.class);
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- @Autowired
- private StreamSourceService sourceService;
- @Autowired
- private StreamSinkService sinkService;
- @Autowired
- private StreamTransformService transformService;
-
- @Override
- public TaskEvent event() {
- return TaskEvent.COMPLETE;
- }
-
- @Override
- public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- LOGGER.info("create sort config for light group, context={}", context);
- try {
- LightGroupResourceProcessForm processForm = (LightGroupResourceProcessForm) context.getProcessForm();
- InlongGroupInfo groupInfo = processForm.getGroupInfo();
- List<InlongStreamInfo> streamInfos = processForm.getStreamInfos();
- final String groupId = groupInfo.getInlongGroupId();
- GroupInfo configInfo = this.createGroupInfo(groupInfo, streamInfos);
- String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
- InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
- extInfo.setInlongGroupId(groupId);
- extInfo.setKeyName(InlongConstants.DATAFLOW);
- extInfo.setKeyValue(dataflow);
- if (groupInfo.getExtList() == null) {
- groupInfo.setExtList(Lists.newArrayList());
- }
- upsertExtInfo(groupInfo, extInfo);
- return ListenerResult.success();
- } catch (Throwable t) {
- LOGGER.error("create sort config error: ", t);
- throw new WorkflowListenerException("create sort config error: " + t.getMessage());
- }
- }
-
- private GroupInfo createGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
- final String groupId = groupInfo.getInlongGroupId();
- List<StreamSource> sourceInfos = sourceService.listSource(groupId, null);
- Map<String, List<StreamSource>> sourceMap = sourceInfos.stream()
- .collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new,
- Collectors.toCollection(ArrayList::new)));
-
- List<StreamSink> streamSinks = sinkService.listSink(groupId, null);
- Map<String, List<StreamSink>> sinkMap = streamSinks.stream()
- .collect(Collectors.groupingBy(StreamSink::getInlongStreamId, HashMap::new,
- Collectors.toCollection(ArrayList::new)));
-
- List<TransformResponse> transformResponses = transformService.listTransform(groupId, null);
- Map<String, List<TransformResponse>> transformMap = transformResponses.stream()
- .collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new,
- Collectors.toCollection(ArrayList::new)));
-
- List<StreamInfo> streamInfos = new ArrayList<>();
- for (InlongStreamInfo stream : streamInfoList) {
- String streamId = stream.getInlongStreamId();
- List<Node> nodes = this.createNodesForStream(sourceMap.get(streamId),
- transformMap.get(streamId), sinkMap.get(streamId));
- StreamInfo streamInfo = new StreamInfo(streamId, nodes,
- NodeRelationUtils.createNodeRelationsForStream(stream));
- streamInfos.add(streamInfo);
-
- // Rebuild joinerNode relation
- List<TransformResponse> transformResponseList = transformMap.get(streamId);
- NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList);
- }
-
- return new GroupInfo(groupInfo.getInlongGroupId(), streamInfos);
- }
-
- private List<Node> createNodesForStream(
- List<StreamSource> sourceInfos,
- List<TransformResponse> transformResponses,
- List<StreamSink> streamSinks) {
- List<Node> nodes = Lists.newArrayList();
- nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceInfos));
- nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses));
- nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks));
- return nodes;
- }
-
- private void upsertExtInfo(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
- groupInfo.getExtList().removeIf(ext -> InlongConstants.DATAFLOW.equals(ext.getKeyName()));
- groupInfo.getExtList().add(extInfo);
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortSelector.java
deleted file mode 100644
index 2561d92b5..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortSelector.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.sort.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-import java.util.List;
-
-/**
- * Selector of light group sort.
- */
-@Slf4j
-public class LightGroupSortSelector implements EventSelector {
-
- @Override
- public boolean accept(WorkflowContext context) {
- ProcessForm processForm = context.getProcessForm();
- if (!(processForm instanceof LightGroupResourceProcessForm)) {
- return false;
- }
- LightGroupResourceProcessForm lightGroupResourceProcessForm = (LightGroupResourceProcessForm) processForm;
- List<InlongStreamInfo> streamInfos = lightGroupResourceProcessForm.getStreamInfos();
- if (CollectionUtils.isEmpty(streamInfos)) {
- log.warn(ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
- return false;
- }
- return true;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index a443d323a..3481c97c2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -58,19 +58,19 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
protected StreamSourceFieldEntityMapper sourceFieldMapper;
/**
- * Setting the parameters of the latest entity.
+ * Getting the source type.
*
- * @param request source request
- * @param targetEntity entity object which will set the new parameters.
+ * @return source type string.
*/
- protected abstract void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity);
+ protected abstract String getSourceType();
/**
- * Getting the source type.
+ * Setting the parameters of the latest entity.
*
- * @return source type string.
+ * @param request source request
+ * @param targetEntity entity object which will set the new parameters.
*/
- protected abstract String getSourceType();
+ protected abstract void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity);
@Override
@Transactional(rollbackFor = Throwable.class)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index 09aa848b1..24eb92487 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -20,13 +20,17 @@ package org.apache.inlong.manager.service.source;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageInfo;
import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import javax.validation.constraints.NotNull;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Interface of the source operator
@@ -64,6 +68,20 @@ public interface StreamSourceOperator {
*/
List<StreamField> getSourceFields(@NotNull Integer sourceId);
+ /**
+ * Get the StreamSource Map by the inlong group info and inlong stream info list.
+ *
+ * @param groupInfo inlong group info
+ * @param streamInfos inlong stream info list
+ * @param streamSources stream source list
+ * @return map of StreamSource list, key-inlongStreamId, value-StreamSourceList
+ * @apiNote The MQ source which was used in InlongGroup must implement the method.
+ */
+ default Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
+ List<InlongStreamInfo> streamInfos, List<StreamSource> streamSources) {
+ return new HashMap<>();
+ }
+
/**
* Get source list response from the given source entity page.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 90136dbd4..e3b09fcf4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -18,11 +18,14 @@
package org.apache.inlong.manager.service.source;
import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import java.util.List;
+import java.util.Map;
/**
* Service layer interface for stream source
@@ -55,6 +58,18 @@ public interface StreamSourceService {
*/
List<StreamSource> listSource(String groupId, String streamId);
+ /**
+ * Get the StreamSource Map by the inlong group info and inlong stream info list.
+ * <p/>
+ * If the group mode is LIGHTWEIGHT, means not using any MQ as a cached source, then just get all related sources.
+ * Otherwise, if the group mode is NORMAL, need get the cached MQ sources.
+ *
+ * @param groupInfo inlong group info
+ * @param streamInfos inlong stream info list
+ * @return map of StreamSource list, key-inlongStreamId, value-StreamSourceList
+ */
+ Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos);
+
/**
* Query the number of undeleted source info based on inlong group and inlong stream id.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 8aa86478e..9f00b6715 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -24,14 +24,17 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -51,8 +54,10 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Implementation of source service interface
@@ -63,7 +68,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamSourceServiceImpl.class);
@Autowired
- private SourceOperatorFactory operationFactory;
+ private SourceOperatorFactory operatorFactory;
@Autowired
private StreamSourceEntityMapper sourceMapper;
@Autowired
@@ -91,13 +96,13 @@ public class StreamSourceServiceImpl implements StreamSourceService {
// According to the source type, save source information
String sourceType = request.getSourceType();
- StreamSourceOperator operation = operationFactory.getInstance(SourceType.forType(sourceType));
+ StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(sourceType));
// Remove id in sourceField when save
List<StreamField> streamFields = request.getFieldList();
if (CollectionUtils.isNotEmpty(streamFields)) {
streamFields.forEach(streamField -> streamField.setId(null));
}
- int id = operation.saveOpt(request, groupEntity.getStatus(), operator);
+ int id = sourceOperator.saveOpt(request, groupEntity.getStatus(), operator);
LOGGER.info("success to save source info: {}", request);
return id;
@@ -111,8 +116,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
LOGGER.error("source not found by id={}", id);
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND);
}
- StreamSourceOperator operation = operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
- StreamSource streamSource = operation.getFromEntity(entity);
+ StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(entity.getSourceType()));
+ StreamSource streamSource = sourceOperator.getFromEntity(entity);
LOGGER.debug("success to get source by id={}", id);
return streamSource;
}
@@ -138,6 +143,30 @@ public class StreamSourceServiceImpl implements StreamSourceService {
return responseList;
}
+ @Override
+ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
+ List<InlongStreamInfo> streamInfos) {
+ String groupId = groupInfo.getInlongGroupId();
+ LOGGER.debug("begin to get source map for groupId={}", groupId);
+ Map<String, List<StreamSource>> result;
+
+ // if the group mode is LIGHTWEIGHT, just get all related stream sources
+ List<StreamSource> streamSources = this.listSource(groupInfo.getInlongGroupId(), null);
+ if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+ result = streamSources.stream()
+ .collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new,
+ Collectors.toCollection(ArrayList::new)));
+ } else {
+ // if the group mode is NORMAL, needs to get the cached MQ sources
+ String sourceType = groupInfo.getMqType();
+ StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(sourceType));
+ result = sourceOperator.getSourcesMap(groupInfo, streamInfos, streamSources);
+ }
+
+ LOGGER.debug("success to get source map, size={}, groupInfo={}", result.size(), groupInfo);
+ return result;
+ }
+
@Override
public PageInfo<? extends StreamSource> listByCondition(SourcePageRequest request) {
Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
@@ -154,8 +183,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
List<StreamSource> responseList = Lists.newArrayList();
for (Map.Entry<SourceType, Page<StreamSourceEntity>> entry : sourceMap.entrySet()) {
SourceType sourceType = entry.getKey();
- StreamSourceOperator operation = operationFactory.getInstance(sourceType);
- PageInfo<? extends StreamSource> pageInfo = operation.getPageInfo(entry.getValue());
+ StreamSourceOperator sourceOperator = operatorFactory.getInstance(sourceType);
+ PageInfo<? extends StreamSource> pageInfo = sourceOperator.getPageInfo(entry.getValue());
if (null != pageInfo && CollectionUtils.isNotEmpty(pageInfo.getList())) {
responseList.addAll(pageInfo.getList());
}
@@ -179,13 +208,13 @@ public class StreamSourceServiceImpl implements StreamSourceService {
InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
String sourceType = request.getSourceType();
- StreamSourceOperator operation = operationFactory.getInstance(SourceType.forType(sourceType));
+ StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(sourceType));
// Remove id in sourceField when save
List<StreamField> streamFields = request.getFieldList();
if (CollectionUtils.isNotEmpty(streamFields)) {
streamFields.forEach(streamField -> streamField.setId(null));
}
- operation.updateOpt(request, groupEntity.getStatus(), operator);
+ sourceOperator.updateOpt(request, groupEntity.getStatus(), operator);
LOGGER.info("success to update source info: {}", request);
return true;
@@ -244,10 +273,10 @@ public class StreamSourceServiceImpl implements StreamSourceService {
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
- StreamSourceOperator operation = operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
+ StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(entity.getSourceType()));
SourceRequest sourceRequest = new SourceRequest();
CommonBeanUtils.copyProperties(entity, sourceRequest, true);
- operation.restartOpt(sourceRequest, operator);
+ sourceOperator.restartOpt(sourceRequest, operator);
LOGGER.info("success to restart source info: {}", entity);
return true;
@@ -262,10 +291,10 @@ public class StreamSourceServiceImpl implements StreamSourceService {
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
- StreamSourceOperator operation = operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
+ StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(entity.getSourceType()));
SourceRequest sourceRequest = new SourceRequest();
CommonBeanUtils.copyProperties(entity, sourceRequest, true);
- operation.stopOpt(sourceRequest, operator);
+ sourceOperator.stopOpt(sourceRequest, operator);
LOGGER.info("success to stop source info: {}", entity);
return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 7409f7e37..9a02a1399 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -18,22 +18,37 @@
package org.apache.inlong.manager.service.source.pulsar;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceDTO;
import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.source.AbstractSourceOperator;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
+import java.util.Map;
/**
* Pulsar stream source operator
@@ -43,6 +58,8 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
@Autowired
private ObjectMapper objectMapper;
+ @Autowired
+ private InlongClusterService clusterService;
@Override
public Boolean accept(SourceType sourceType) {
@@ -82,4 +99,51 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
return source;
}
+ @Override
+ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
+ List<InlongStreamInfo> streamInfos, List<StreamSource> streamSources) {
+ ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+ String adminUrl = pulsarCluster.getAdminUrl();
+ String serviceUrl = pulsarCluster.getUrl();
+ String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
+ ? InlongConstants.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
+
+ Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
+ streamInfos.forEach(streamInfo -> {
+ PulsarSource pulsarSource = new PulsarSource();
+ String streamId = streamInfo.getInlongStreamId();
+ pulsarSource.setSourceName(streamId);
+ pulsarSource.setTenant(tenant);
+ pulsarSource.setNamespace(groupInfo.getMqResource());
+ pulsarSource.setTopic(streamInfo.getMqResource());
+ pulsarSource.setAdminUrl(adminUrl);
+ pulsarSource.setServiceUrl(serviceUrl);
+ pulsarSource.setInlongComponent(true);
+
+ for (StreamSource sourceInfo : streamSources) {
+ if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) {
+ continue;
+ }
+ if (StringUtils.isEmpty(pulsarSource.getSerializationType())
+ && StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
+ pulsarSource.setSerializationType(sourceInfo.getSerializationType());
+ }
+ if (SourceType.forType(sourceInfo.getSourceType()) == SourceType.KAFKA) {
+ pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
+ }
+ }
+
+ // if the SerializationType is still null, set it to the CSV
+ if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
+ pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
+ }
+ pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
+ pulsarSource.setFieldList(streamInfo.getFieldList());
+ sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
+ });
+
+ return sourceMap;
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 8bb953697..6e14acdac 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -19,22 +19,32 @@
package org.apache.inlong.manager.service.source.tubemq;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.source.tubemq.TubeMQSource;
import org.apache.inlong.manager.common.pojo.source.tubemq.TubeMQSourceDTO;
import org.apache.inlong.manager.common.pojo.source.tubemq.TubeMQSourceRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.source.AbstractSourceOperator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
/**
* TubeMQ source operator
@@ -44,6 +54,18 @@ public class TubeMQSourceOperator extends AbstractSourceOperator {
@Autowired
private ObjectMapper objectMapper;
+ @Autowired
+ private InlongClusterService clusterService;
+
+ @Override
+ public Boolean accept(SourceType sourceType) {
+ return SourceType.TUBEMQ == sourceType;
+ }
+
+ @Override
+ protected String getSourceType() {
+ return SourceType.TUBEMQ.getType();
+ }
@Override
protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
@@ -57,16 +79,6 @@ public class TubeMQSourceOperator extends AbstractSourceOperator {
}
}
- @Override
- protected String getSourceType() {
- return SourceType.TUBEMQ.getType();
- }
-
- @Override
- public Boolean accept(SourceType sourceType) {
- return SourceType.TUBEMQ == sourceType;
- }
-
@Override
public StreamSource getFromEntity(StreamSourceEntity entity) {
TubeMQSource source = new TubeMQSource();
@@ -81,4 +93,32 @@ public class TubeMQSourceOperator extends AbstractSourceOperator {
return source;
}
+ @Override
+ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
+ List<InlongStreamInfo> streamInfos, List<StreamSource> streamSources) {
+ ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.TUBE);
+ TubeClusterInfo tubeClusterInfo = (TubeClusterInfo) clusterInfo;
+ String masterRpc = tubeClusterInfo.getUrl();
+
+ Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
+ streamInfos.forEach(streamInfo -> {
+ TubeMQSource tubeMQSource = new TubeMQSource();
+ String streamId = streamInfo.getInlongStreamId();
+ tubeMQSource.setSourceName(streamId);
+ tubeMQSource.setTopic(streamInfo.getMqResource());
+ tubeMQSource.setGroupId(streamId);
+ tubeMQSource.setMasterRpc(masterRpc);
+ for (StreamSource sourceInfo : streamSources) {
+ if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) {
+ continue;
+ }
+ tubeMQSource.setSerializationType(sourceInfo.getSerializationType());
+ }
+ tubeMQSource.setFieldList(streamInfo.getFieldList());
+ sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(tubeMQSource);
+ });
+
+ return sourceMap;
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
index d84d7bed0..650bef608 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
@@ -31,8 +31,6 @@ import org.apache.inlong.manager.service.mq.TubeEventSelector;
import org.apache.inlong.manager.service.resource.SinkResourceListener;
import org.apache.inlong.manager.service.sort.SortConfigListener;
import org.apache.inlong.manager.service.sort.ZookeeperDisabledSelector;
-import org.apache.inlong.manager.service.sort.light.LightGroupSortListener;
-import org.apache.inlong.manager.service.sort.light.LightGroupSortSelector;
import org.apache.inlong.manager.service.source.listener.SourceDeleteEventSelector;
import org.apache.inlong.manager.service.source.listener.SourceDeleteListener;
import org.apache.inlong.manager.service.source.listener.SourceRestartEventSelector;
@@ -93,13 +91,9 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
@Autowired
private SinkResourceListener sinkResourceListener;
-
@Autowired
private SortConfigListener sortConfigListener;
- @Autowired
- private LightGroupSortListener lightGroupSortListener;
-
@PostConstruct
public void init() {
sourceOperateListeners = new LinkedHashMap<>();
@@ -114,7 +108,6 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
queueOperateListeners.put(deletePulsarResourceTaskListener, new PulsarResourceDeleteSelector());
sortOperateListeners = new LinkedHashMap<>();
sortOperateListeners.put(sortConfigListener, new ZookeeperDisabledSelector());
- sortOperateListeners.put(lightGroupSortListener, new LightGroupSortSelector());
}
public void clearListeners() {