You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/15 11:05:16 UTC

[inlong] branch master updated: [INLONG-5030][Manager] Use SortConfigListener replace the LightGroupSortListener (#5032)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 926efc66d [INLONG-5030][Manager] Use SortConfigListener replace the LightGroupSortListener (#5032)
926efc66d is described below

commit 926efc66db7e1cb6398936cd1e781de563d587c5
Author: healchow <he...@gmail.com>
AuthorDate: Fri Jul 15 19:05:10 2022 +0800

    [INLONG-5030][Manager] Use SortConfigListener replace the LightGroupSortListener (#5032)
---
 .../common/pojo/source/tubemq/TubeMQSource.java    |   2 +-
 .../form/process/GroupResourceProcessForm.java     |  21 +--
 .../manager/service/sink/StreamSinkService.java    |  12 ++
 .../service/sink/StreamSinkServiceImpl.java        |  18 ++
 ...perator.java => DefaultSortConfigOperator.java} | 189 ++++++++-------------
 .../manager/service/sort/SortConfigListener.java   |   3 +-
 .../manager/service/sort/SortConfigOperator.java   |   3 +-
 .../service/sort/SortConfigOperatorFactory.java    |   9 +-
 .../service/sort/StreamSortConfigListener.java     |   5 +-
 .../service/sort/light/LightGroupSortListener.java | 152 -----------------
 .../service/sort/light/LightGroupSortSelector.java |  51 ------
 .../service/source/AbstractSourceOperator.java     |  14 +-
 .../service/source/StreamSourceOperator.java       |  18 ++
 .../service/source/StreamSourceService.java        |  15 ++
 .../service/source/StreamSourceServiceImpl.java    |  55 ++++--
 .../source/pulsar/PulsarSourceOperator.java        |  64 +++++++
 .../source/tubemq/TubeMQSourceOperator.java        |  60 +++++--
 .../listener/GroupTaskListenerFactory.java         |   7 -
 18 files changed, 309 insertions(+), 389 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSource.java
index 2458a5509..74d85d09d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSource.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/tubemq/TubeMQSource.java
@@ -58,7 +58,7 @@ public class TubeMQSource extends StreamSource {
     private String sessionKey;
 
     /**
-     * The tubemq consumers use this tid set to filter records reading from server.
+     * The TubeMQ consumers use this tid set to filter records reading from server.
      */
     @ApiModelProperty("Tid of the TubeMQ")
     private TreeSet<String> tid;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/GroupResourceProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/GroupResourceProcessForm.java
index 03c9e15cb..61547477e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/GroupResourceProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/GroupResourceProcessForm.java
@@ -17,16 +17,15 @@
 
 package org.apache.inlong.manager.common.pojo.workflow.form.process;
 
-import com.google.common.collect.Maps;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.exceptions.FormValidateException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.util.Preconditions;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -41,22 +40,13 @@ public class GroupResourceProcessForm extends BaseProcessForm {
 
     private InlongGroupInfo groupInfo;
 
-    @Getter
-    @Setter
-    private GroupOperateType groupOperateType = GroupOperateType.INIT;
-
     private List<InlongStreamInfo> streamInfos;
 
-    public InlongGroupInfo getGroupInfo() {
-        return groupInfo;
-    }
-
-    public void setGroupInfo(InlongGroupInfo groupInfo) {
-        this.groupInfo = groupInfo;
-    }
+    private GroupOperateType groupOperateType = GroupOperateType.INIT;
 
     @Override
     public void validate() throws FormValidateException {
+        Preconditions.checkNotNull(groupInfo, "InlongGroupInfo cannot be null");
     }
 
     @Override
@@ -71,9 +61,10 @@ public class GroupResourceProcessForm extends BaseProcessForm {
 
     @Override
     public Map<String, Object> showInList() {
-        Map<String, Object> show = Maps.newHashMap();
+        Map<String, Object> show = new HashMap<>();
         show.put("inlongGroupId", groupInfo.getInlongGroupId());
         show.put("groupOperateType", this.groupOperateType);
         return show;
     }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index c141a0aea..aad07b29f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -18,13 +18,16 @@
 package org.apache.inlong.manager.service.sink;
 
 import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Service layer interface for stream sink
@@ -66,6 +69,15 @@ public interface StreamSinkService {
      */
     List<SinkBriefResponse> listBrief(String groupId, String streamId);
 
+    /**
+     * Get the StreamSink Map by the inlong group info and inlong stream info list.
+     *
+     * @param groupInfo inlong group info
+     * @param streamInfos inlong stream info list
+     * @return map of StreamSink list, key-inlongStreamId, value-StreamSinkList
+     */
+    Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos);
+
     /**
      * Query the number of undeleted sink info based on inlong group and inlong stream id
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 9327c26d7..cd9c64337 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -30,12 +30,14 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
@@ -53,9 +55,11 @@ import org.springframework.transaction.annotation.Transactional;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * Implementation of sink service interface
@@ -161,6 +165,20 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         return summaryList;
     }
 
+    @Override
+    public Map<String, List<StreamSink>> getSinksMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos) {
+        String groupId = groupInfo.getInlongGroupId();
+        LOGGER.debug("begin to get sink map for groupId={}", groupId);
+
+        List<StreamSink> streamSinks = this.listSink(groupId, null);
+        Map<String, List<StreamSink>> result = streamSinks.stream()
+                .collect(Collectors.groupingBy(StreamSink::getInlongStreamId, HashMap::new,
+                        Collectors.toCollection(ArrayList::new)));
+
+        LOGGER.debug("success to get sink map, size={}, groupInfo={}", result.size(), groupInfo);
+        return result;
+    }
+
     @Override
     public PageInfo<? extends StreamSink> listByCondition(SinkPageRequest request) {
         Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfig4NormalGroupOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
similarity index 50%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfig4NormalGroupOperator.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
index 95674cee9..cf77c6c28 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfig4NormalGroupOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
@@ -18,35 +18,25 @@
 package org.apache.inlong.manager.service.sort;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.enums.SourceType;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
-import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
-import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
-import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
-import org.apache.inlong.manager.common.pojo.source.tubemq.TubeMQSource;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
 import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
+import org.apache.inlong.manager.service.sort.util.NodeRelationUtils;
+import org.apache.inlong.manager.service.sort.util.TransformNodeUtils;
 import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.service.transform.StreamTransformService;
 import org.apache.inlong.sort.protocol.GroupInfo;
 import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
 import org.apache.inlong.sort.protocol.node.Node;
 import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
 import org.slf4j.Logger;
@@ -61,25 +51,24 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
- * Sort config operator, used to create a Sort config for the InlongGroup in normal mode with ZK disabled.
+ * Default Sort config operator, used to create a Sort config for the InlongGroup with ZK disabled.
  */
 @Service
-public class SortConfig4NormalGroupOperator implements SortConfigOperator {
+public class DefaultSortConfigOperator implements SortConfigOperator {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SortConfig4NormalGroupOperator.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSortConfigOperator.class);
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     @Autowired
     private StreamSourceService sourceService;
     @Autowired
-    private StreamSinkService sinkService;
+    private StreamTransformService transformService;
     @Autowired
-    private InlongClusterService clusterService;
+    private StreamSinkService sinkService;
 
     @Override
-    public Boolean accept(Integer isNormal, Integer enableZk) {
-        return InlongConstants.NORMAL_MODE.equals(isNormal)
-                && InlongConstants.DISABLE_ZK.equals(enableZk);
+    public Boolean accept(Integer enableZk) {
+        return InlongConstants.DISABLE_ZK.equals(enableZk);
     }
 
     @Override
@@ -90,32 +79,67 @@ public class SortConfig4NormalGroupOperator implements SortConfigOperator {
             return;
         }
 
-        GroupInfo configInfo = this.createSortGroupInfo(groupInfo, streamInfos);
-        String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
+        GroupInfo configInfo;
+        // if the mode of inlong group is LIGHTWEIGHT, means not using any MQ as a cached source
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos);
+        } else {
+            configInfo = this.getNormalGroupInfo(groupInfo, streamInfos);
+        }
 
+        String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
         if (isStream) {
             this.addToStreamExt(streamInfos, dataflow);
         } else {
             this.addToGroupExt(groupInfo, dataflow);
         }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("success to build sort config, isStream={}, dataflow={}", isStream, dataflow);
+        }
+    }
+
+    private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+        // get source info
+        Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
+        // get sink info
+        String groupId = groupInfo.getInlongGroupId();
+        Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
+
+        List<TransformResponse> transformResponses = transformService.listTransform(groupId, null);
+        Map<String, List<TransformResponse>> transformMap = transformResponses.stream()
+                .collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new,
+                        Collectors.toCollection(ArrayList::new)));
+
+        List<StreamInfo> sortStreamInfos = new ArrayList<>();
+        for (InlongStreamInfo inlongStream : streamInfoList) {
+            String streamId = inlongStream.getInlongStreamId();
+
+            List<TransformResponse> transformResponseList = transformMap.get(streamId);
+            List<Node> nodes = this.createNodesForStream(sourceMap.get(streamId),
+                    transformResponseList, sinkMap.get(streamId));
+            StreamInfo streamInfo = new StreamInfo(streamId, nodes,
+                    NodeRelationUtils.createNodeRelationsForStream(inlongStream));
+            sortStreamInfos.add(streamInfo);
+
+            // rebuild joinerNode relation
+            NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList);
+        }
+
+        return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
     /**
-     * Create GroupInfo for Sort protocol.
+     * Get Sort GroupInfo of normal inlong group.
      *
      * @see org.apache.inlong.sort.protocol.GroupInfo
      */
-    private GroupInfo createSortGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
-        String groupId = groupInfo.getInlongGroupId();
-        List<StreamSink> streamSinks = sinkService.listSink(groupId, null);
-        Map<String, List<StreamSink>> sinkMap = streamSinks.stream()
-                .collect(Collectors.groupingBy(StreamSink::getInlongStreamId, HashMap::new,
-                        Collectors.toCollection(ArrayList::new)));
-
+    private GroupInfo getNormalGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
         // get source info
-        Map<String, List<StreamSource>> sourceMap;
-        MQType.forType(groupInfo.getMqType());
-        sourceMap = createMQSources(groupInfo, streamInfoList);
+        Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
+        // get sink info
+        Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
+
         // create StreamInfo for Sort protocol
         List<StreamInfo> sortStreamInfos = new ArrayList<>();
         for (InlongStreamInfo inlongStream : streamInfoList) {
@@ -128,91 +152,7 @@ public class SortConfig4NormalGroupOperator implements SortConfigOperator {
             sortStreamInfos.add(sortStream);
         }
 
-        return new GroupInfo(groupId, sortStreamInfos);
-    }
-
-    private Map<String, List<StreamSource>> createMQSources(
-            InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
-        if (MQType.forType(groupInfo.getMqType()) == MQType.TUBE) {
-            return createTubeSource(groupInfo, streamInfoList);
-        }
-        return createPulsarSource(groupInfo, streamInfoList);
-    }
-
-    /**
-     * Create Tube sources for Sort.
-     */
-    private Map<String, List<StreamSource>> createTubeSource(
-            InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
-        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null,
-                ClusterType.TUBE);
-        TubeClusterInfo tubeClusterInfo = (TubeClusterInfo) clusterInfo;
-        String masterRpc = tubeClusterInfo.getUrl();
-        Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
-        streamInfoList.forEach(streamInfo -> {
-            TubeMQSource tubeMQSource = new TubeMQSource();
-            String streamId = streamInfo.getInlongStreamId();
-            tubeMQSource.setSourceName(streamId);
-            tubeMQSource.setTopic(streamInfo.getMqResource());
-            tubeMQSource.setGroupId(streamId);
-            tubeMQSource.setMasterRpc(masterRpc);
-            List<StreamSource> sourceInfos = sourceService.listSource(groupInfo.getInlongGroupId(), streamId);
-            for (StreamSource sourceInfo : sourceInfos) {
-                tubeMQSource.setSerializationType(sourceInfo.getSerializationType());
-            }
-            tubeMQSource.setFieldList(streamInfo.getFieldList());
-            sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(tubeMQSource);
-        });
-
-        return sourceMap;
-    }
-
-    /**
-     * Create Pulsar sources for Sort.
-     */
-    private Map<String, List<StreamSource>> createPulsarSource(
-            InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
-        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null,
-                ClusterType.PULSAR);
-        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
-        String adminUrl = pulsarCluster.getAdminUrl();
-        String serviceUrl = pulsarCluster.getUrl();
-        String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
-                ? InlongConstants.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
-
-        Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
-        streamInfoList.forEach(streamInfo -> {
-            PulsarSource pulsarSource = new PulsarSource();
-            String streamId = streamInfo.getInlongStreamId();
-            pulsarSource.setSourceName(streamId);
-            pulsarSource.setTenant(tenant);
-            pulsarSource.setNamespace(groupInfo.getMqResource());
-            pulsarSource.setTopic(streamInfo.getMqResource());
-            pulsarSource.setAdminUrl(adminUrl);
-            pulsarSource.setServiceUrl(serviceUrl);
-            pulsarSource.setInlongComponent(true);
-
-            List<StreamSource> sourceInfos = sourceService.listSource(groupInfo.getInlongGroupId(), streamId);
-            for (StreamSource sourceInfo : sourceInfos) {
-                if (StringUtils.isEmpty(pulsarSource.getSerializationType())
-                        && StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
-                    pulsarSource.setSerializationType(sourceInfo.getSerializationType());
-                }
-                if (SourceType.forType(sourceInfo.getSourceType()) == SourceType.KAFKA) {
-                    pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
-                }
-            }
-
-            // if the SerializationType is still null, set it to the CSV
-            if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
-                pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
-            }
-            pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
-            pulsarSource.setFieldList(streamInfo.getFieldList());
-            sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
-        });
-
-        return sourceMap;
+        return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
     }
 
     private List<Node> createNodesForStream(List<StreamSource> sources, List<StreamSink> streamSinks) {
@@ -222,6 +162,15 @@ public class SortConfig4NormalGroupOperator implements SortConfigOperator {
         return nodes;
     }
 
+    private List<Node> createNodesForStream(List<StreamSource> sourceInfos,
+            List<TransformResponse> transformResponses, List<StreamSink> streamSinks) {
+        List<Node> nodes = Lists.newArrayList();
+        nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceInfos));
+        nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses));
+        nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks));
+        return nodes;
+    }
+
     private List<NodeRelation> createNodeRelationsForStream(List<StreamSource> sources, List<StreamSink> streamSinks) {
         NodeRelation relation = new NodeRelation();
         List<String> inputs = sources.stream().map(StreamSource::getSourceName).collect(Collectors.toList());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
index ffaeec151..14d8f3258 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigListener.java
@@ -72,8 +72,7 @@ public class SortConfigListener implements SortOperateListener {
         }
 
         try {
-            SortConfigOperator operator = operatorFactory.getInstance(groupInfo.getLightweight(),
-                    groupInfo.getEnableZookeeper());
+            SortConfigOperator operator = operatorFactory.getInstance(groupInfo.getEnableZookeeper());
             operator.buildConfig(groupInfo, streamInfos, false);
         } catch (Exception e) {
             String msg = String.format("failed to build sort config for groupId=%s, ", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperator.java
index e6c35253f..f8c9de5b4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperator.java
@@ -30,10 +30,9 @@ public interface SortConfigOperator {
     /**
      * Determines whether the current instance matches the specified type.
      *
-     * @param isNormal is the inlong group is normal mode, 0: normal mode, 1: lightweight mode
      * @param enableZk is the inlong group enable the ZooKeeper, 1: enable, 0: disable
      */
-    Boolean accept(Integer isNormal, Integer enableZk);
+    Boolean accept(Integer enableZk);
 
     /**
      * Build Sort config.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperatorFactory.java
index 4c6c547b4..fcd8ed81d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperatorFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/SortConfigOperatorFactory.java
@@ -35,15 +35,14 @@ public class SortConfigOperatorFactory {
     /**
      * Get a Sort config operator instance.
      *
-     * @param isNormal is the inlong group is normal mode, 0: normal mode, 1: lightweight mode
      * @param enableZk is the inlong group enable the ZooKeeper, 1: enable, 0: disable
      */
-    public SortConfigOperator getInstance(Integer isNormal, Integer enableZk) {
+    public SortConfigOperator getInstance(Integer enableZk) {
         return operatorList.stream()
-                .filter(inst -> inst.accept(isNormal, enableZk))
+                .filter(inst -> inst.accept(enableZk))
                 .findFirst()
-                .orElseThrow(() -> new BusinessException("not found any instance of SortConfigOperator when isNormal="
-                        + isNormal + ", enableZk=" + enableZk));
+                .orElseThrow(() -> new BusinessException("not found any instance of SortConfigOperator when enableZk="
+                        + enableZk));
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
index 17ddacf4d..dda1086fc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/StreamSortConfigListener.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.service.sort;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -45,7 +44,6 @@ import java.util.List;
 public class StreamSortConfigListener implements SortOperateListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(StreamSortConfigListener.class);
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
 
     @Autowired
     private SortConfigOperatorFactory operatorFactory;
@@ -79,8 +77,7 @@ public class StreamSortConfigListener implements SortOperateListener {
 
         List<InlongStreamInfo> streamInfos = Collections.singletonList(streamInfo);
         try {
-            SortConfigOperator operator = operatorFactory.getInstance(groupInfo.getLightweight(),
-                    groupInfo.getEnableZookeeper());
+            SortConfigOperator operator = operatorFactory.getInstance(groupInfo.getEnableZookeeper());
             operator.buildConfig(groupInfo, streamInfos, true);
         } catch (Exception e) {
             String msg = String.format("failed to build sort config for groupId=%s, streamId=%s, ", groupId, streamId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
deleted file mode 100644
index 90f116821..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortListener.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.sort.light;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.sink.StreamSink;
-import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.sink.StreamSinkService;
-import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
-import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
-import org.apache.inlong.manager.service.sort.util.NodeRelationUtils;
-import org.apache.inlong.manager.service.sort.util.TransformNodeUtils;
-import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.transform.StreamTransformService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
-import org.apache.inlong.manager.workflow.event.task.TaskEvent;
-import org.apache.inlong.sort.protocol.GroupInfo;
-import org.apache.inlong.sort.protocol.StreamInfo;
-import org.apache.inlong.sort.protocol.node.Node;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * Event listener of creat light group sort config.
- */
-@Component
-public class LightGroupSortListener implements SortOperateListener {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(LightGroupSortListener.class);
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
-    @Autowired
-    private StreamSourceService sourceService;
-    @Autowired
-    private StreamSinkService sinkService;
-    @Autowired
-    private StreamTransformService transformService;
-
-    @Override
-    public TaskEvent event() {
-        return TaskEvent.COMPLETE;
-    }
-
-    @Override
-    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        LOGGER.info("create sort config for light group, context={}", context);
-        try {
-            LightGroupResourceProcessForm processForm = (LightGroupResourceProcessForm) context.getProcessForm();
-            InlongGroupInfo groupInfo = processForm.getGroupInfo();
-            List<InlongStreamInfo> streamInfos = processForm.getStreamInfos();
-            final String groupId = groupInfo.getInlongGroupId();
-            GroupInfo configInfo = this.createGroupInfo(groupInfo, streamInfos);
-            String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
-            InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
-            extInfo.setInlongGroupId(groupId);
-            extInfo.setKeyName(InlongConstants.DATAFLOW);
-            extInfo.setKeyValue(dataflow);
-            if (groupInfo.getExtList() == null) {
-                groupInfo.setExtList(Lists.newArrayList());
-            }
-            upsertExtInfo(groupInfo, extInfo);
-            return ListenerResult.success();
-        } catch (Throwable t) {
-            LOGGER.error("create sort config error: ", t);
-            throw new WorkflowListenerException("create sort config error: " + t.getMessage());
-        }
-    }
-
-    private GroupInfo createGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
-        final String groupId = groupInfo.getInlongGroupId();
-        List<StreamSource> sourceInfos = sourceService.listSource(groupId, null);
-        Map<String, List<StreamSource>> sourceMap = sourceInfos.stream()
-                .collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new,
-                        Collectors.toCollection(ArrayList::new)));
-
-        List<StreamSink> streamSinks = sinkService.listSink(groupId, null);
-        Map<String, List<StreamSink>> sinkMap = streamSinks.stream()
-                .collect(Collectors.groupingBy(StreamSink::getInlongStreamId, HashMap::new,
-                        Collectors.toCollection(ArrayList::new)));
-
-        List<TransformResponse> transformResponses = transformService.listTransform(groupId, null);
-        Map<String, List<TransformResponse>> transformMap = transformResponses.stream()
-                .collect(Collectors.groupingBy(TransformResponse::getInlongStreamId, HashMap::new,
-                        Collectors.toCollection(ArrayList::new)));
-
-        List<StreamInfo> streamInfos = new ArrayList<>();
-        for (InlongStreamInfo stream : streamInfoList) {
-            String streamId = stream.getInlongStreamId();
-            List<Node> nodes = this.createNodesForStream(sourceMap.get(streamId),
-                    transformMap.get(streamId), sinkMap.get(streamId));
-            StreamInfo streamInfo = new StreamInfo(streamId, nodes,
-                    NodeRelationUtils.createNodeRelationsForStream(stream));
-            streamInfos.add(streamInfo);
-
-            // Rebuild joinerNode relation
-            List<TransformResponse> transformResponseList = transformMap.get(streamId);
-            NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList);
-        }
-
-        return new GroupInfo(groupInfo.getInlongGroupId(), streamInfos);
-    }
-
-    private List<Node> createNodesForStream(
-            List<StreamSource> sourceInfos,
-            List<TransformResponse> transformResponses,
-            List<StreamSink> streamSinks) {
-        List<Node> nodes = Lists.newArrayList();
-        nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceInfos));
-        nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses));
-        nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks));
-        return nodes;
-    }
-
-    private void upsertExtInfo(InlongGroupInfo groupInfo, InlongGroupExtInfo extInfo) {
-        groupInfo.getExtList().removeIf(ext -> InlongConstants.DATAFLOW.equals(ext.getKeyName()));
-        groupInfo.getExtList().add(extInfo);
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortSelector.java
deleted file mode 100644
index 2561d92b5..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/light/LightGroupSortSelector.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.sort.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.EventSelector;
-
-import java.util.List;
-
-/**
- * Selector of light group sort.
- */
-@Slf4j
-public class LightGroupSortSelector implements EventSelector {
-
-    @Override
-    public boolean accept(WorkflowContext context) {
-        ProcessForm processForm = context.getProcessForm();
-        if (!(processForm instanceof LightGroupResourceProcessForm)) {
-            return false;
-        }
-        LightGroupResourceProcessForm lightGroupResourceProcessForm = (LightGroupResourceProcessForm) processForm;
-        List<InlongStreamInfo> streamInfos = lightGroupResourceProcessForm.getStreamInfos();
-        if (CollectionUtils.isEmpty(streamInfos)) {
-            log.warn(ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
-            return false;
-        }
-        return true;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index a443d323a..3481c97c2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -58,19 +58,19 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
     protected StreamSourceFieldEntityMapper sourceFieldMapper;
 
     /**
-     * Setting the parameters of the latest entity.
+     * Getting the source type.
      *
-     * @param request source request
-     * @param targetEntity entity object which will set the new parameters.
+     * @return source type string.
      */
-    protected abstract void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity);
+    protected abstract String getSourceType();
 
     /**
-     * Getting the source type.
+     * Setting the parameters of the latest entity.
      *
-     * @return source type string.
+     * @param request source request
+     * @param targetEntity entity object which will set the new parameters.
      */
-    protected abstract String getSourceType();
+    protected abstract void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity);
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
index 09aa848b1..24eb92487 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java
@@ -20,13 +20,17 @@ package org.apache.inlong.manager.service.source;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageInfo;
 import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 
 import javax.validation.constraints.NotNull;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Interface of the source operator
@@ -64,6 +68,20 @@ public interface StreamSourceOperator {
      */
     List<StreamField> getSourceFields(@NotNull Integer sourceId);
 
+    /**
+     * Get the StreamSource Map by the inlong group info and inlong stream info list.
+     *
+     * @param groupInfo inlong group info
+     * @param streamInfos inlong stream info list
+     * @param streamSources stream source list
+     * @return map of StreamSource list, key-inlongStreamId, value-StreamSourceList
+     * @apiNote The MQ source which was used in InlongGroup must implement the method.
+     */
+    default Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
+            List<InlongStreamInfo> streamInfos, List<StreamSource> streamSources) {
+        return new HashMap<>();
+    }
+
     /**
      * Get source list response from the given source entity page.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 90136dbd4..e3b09fcf4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -18,11 +18,14 @@
 package org.apache.inlong.manager.service.source;
 
 import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Service layer interface for stream source
@@ -55,6 +58,18 @@ public interface StreamSourceService {
      */
     List<StreamSource> listSource(String groupId, String streamId);
 
+    /**
+     * Get the StreamSource Map by the inlong group info and inlong stream info list.
+     * <p/>
+     * If the group mode is LIGHTWEIGHT, means not using any MQ as a cached source, then just get all related sources.
+     * Otherwise, if the group mode is NORMAL, need get the cached MQ sources.
+     *
+     * @param groupInfo inlong group info
+     * @param streamInfos inlong stream info list
+     * @return map of StreamSource list, key-inlongStreamId, value-StreamSourceList
+     */
+    Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos);
+
     /**
      * Query the number of undeleted source info based on inlong group and inlong stream id.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 8aa86478e..9f00b6715 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -24,14 +24,17 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -51,8 +54,10 @@ import org.springframework.transaction.annotation.Transactional;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Implementation of source service interface
@@ -63,7 +68,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     private static final Logger LOGGER = LoggerFactory.getLogger(StreamSourceServiceImpl.class);
 
     @Autowired
-    private SourceOperatorFactory operationFactory;
+    private SourceOperatorFactory operatorFactory;
     @Autowired
     private StreamSourceEntityMapper sourceMapper;
     @Autowired
@@ -91,13 +96,13 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
         // According to the source type, save source information
         String sourceType = request.getSourceType();
-        StreamSourceOperator operation = operationFactory.getInstance(SourceType.forType(sourceType));
+        StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(sourceType));
         // Remove id in sourceField when save
         List<StreamField> streamFields = request.getFieldList();
         if (CollectionUtils.isNotEmpty(streamFields)) {
             streamFields.forEach(streamField -> streamField.setId(null));
         }
-        int id = operation.saveOpt(request, groupEntity.getStatus(), operator);
+        int id = sourceOperator.saveOpt(request, groupEntity.getStatus(), operator);
 
         LOGGER.info("success to save source info: {}", request);
         return id;
@@ -111,8 +116,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
             LOGGER.error("source not found by id={}", id);
             throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_NOT_FOUND);
         }
-        StreamSourceOperator operation = operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
-        StreamSource streamSource = operation.getFromEntity(entity);
+        StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(entity.getSourceType()));
+        StreamSource streamSource = sourceOperator.getFromEntity(entity);
         LOGGER.debug("success to get source by id={}", id);
         return streamSource;
     }
@@ -138,6 +143,30 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         return responseList;
     }
 
+    @Override
+    public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
+            List<InlongStreamInfo> streamInfos) {
+        String groupId = groupInfo.getInlongGroupId();
+        LOGGER.debug("begin to get source map for groupId={}", groupId);
+        Map<String, List<StreamSource>> result;
+
+        // if the group mode is LIGHTWEIGHT, just get all related stream sources
+        List<StreamSource> streamSources = this.listSource(groupInfo.getInlongGroupId(), null);
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            result = streamSources.stream()
+                    .collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new,
+                            Collectors.toCollection(ArrayList::new)));
+        } else {
+            // if the group mode is NORMAL, needs to get the cached MQ sources
+            String sourceType = groupInfo.getMqType();
+            StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(sourceType));
+            result = sourceOperator.getSourcesMap(groupInfo, streamInfos, streamSources);
+        }
+
+        LOGGER.debug("success to get source map, size={}, groupInfo={}", result.size(), groupInfo);
+        return result;
+    }
+
     @Override
     public PageInfo<? extends StreamSource> listByCondition(SourcePageRequest request) {
         Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
@@ -154,8 +183,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         List<StreamSource> responseList = Lists.newArrayList();
         for (Map.Entry<SourceType, Page<StreamSourceEntity>> entry : sourceMap.entrySet()) {
             SourceType sourceType = entry.getKey();
-            StreamSourceOperator operation = operationFactory.getInstance(sourceType);
-            PageInfo<? extends StreamSource> pageInfo = operation.getPageInfo(entry.getValue());
+            StreamSourceOperator sourceOperator = operatorFactory.getInstance(sourceType);
+            PageInfo<? extends StreamSource> pageInfo = sourceOperator.getPageInfo(entry.getValue());
             if (null != pageInfo && CollectionUtils.isNotEmpty(pageInfo.getList())) {
                 responseList.addAll(pageInfo.getList());
             }
@@ -179,13 +208,13 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
 
         String sourceType = request.getSourceType();
-        StreamSourceOperator operation = operationFactory.getInstance(SourceType.forType(sourceType));
+        StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(sourceType));
         // Remove id in sourceField when save
         List<StreamField> streamFields = request.getFieldList();
         if (CollectionUtils.isNotEmpty(streamFields)) {
             streamFields.forEach(streamField -> streamField.setId(null));
         }
-        operation.updateOpt(request, groupEntity.getStatus(), operator);
+        sourceOperator.updateOpt(request, groupEntity.getStatus(), operator);
 
         LOGGER.info("success to update source info: {}", request);
         return true;
@@ -244,10 +273,10 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
-        StreamSourceOperator operation = operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
+        StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(entity.getSourceType()));
         SourceRequest sourceRequest = new SourceRequest();
         CommonBeanUtils.copyProperties(entity, sourceRequest, true);
-        operation.restartOpt(sourceRequest, operator);
+        sourceOperator.restartOpt(sourceRequest, operator);
 
         LOGGER.info("success to restart source info: {}", entity);
         return true;
@@ -262,10 +291,10 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
         groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
-        StreamSourceOperator operation = operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
+        StreamSourceOperator sourceOperator = operatorFactory.getInstance(SourceType.forType(entity.getSourceType()));
         SourceRequest sourceRequest = new SourceRequest();
         CommonBeanUtils.copyProperties(entity, sourceRequest, true);
-        operation.stopOpt(sourceRequest, operator);
+        sourceOperator.stopOpt(sourceRequest, operator);
 
         LOGGER.info("success to stop source info: {}", entity);
         return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index 7409f7e37..9a02a1399 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -18,22 +18,37 @@
 package org.apache.inlong.manager.service.source.pulsar;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
 import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
 import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceDTO;
 import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.source.AbstractSourceOperator;
+import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Pulsar stream source operator
@@ -43,6 +58,8 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
 
     @Autowired
     private ObjectMapper objectMapper;
+    @Autowired
+    private InlongClusterService clusterService;
 
     @Override
     public Boolean accept(SourceType sourceType) {
@@ -82,4 +99,51 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
         return source;
     }
 
+    @Override
+    public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
+            List<InlongStreamInfo> streamInfos, List<StreamSource> streamSources) {
+        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
+        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+        String adminUrl = pulsarCluster.getAdminUrl();
+        String serviceUrl = pulsarCluster.getUrl();
+        String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
+                ? InlongConstants.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
+
+        Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
+        streamInfos.forEach(streamInfo -> {
+            PulsarSource pulsarSource = new PulsarSource();
+            String streamId = streamInfo.getInlongStreamId();
+            pulsarSource.setSourceName(streamId);
+            pulsarSource.setTenant(tenant);
+            pulsarSource.setNamespace(groupInfo.getMqResource());
+            pulsarSource.setTopic(streamInfo.getMqResource());
+            pulsarSource.setAdminUrl(adminUrl);
+            pulsarSource.setServiceUrl(serviceUrl);
+            pulsarSource.setInlongComponent(true);
+
+            for (StreamSource sourceInfo : streamSources) {
+                if (!Objects.equal(streamId, sourceInfo.getInlongStreamId())) {
+                    continue;
+                }
+                if (StringUtils.isEmpty(pulsarSource.getSerializationType())
+                        && StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
+                    pulsarSource.setSerializationType(sourceInfo.getSerializationType());
+                }
+                if (SourceType.forType(sourceInfo.getSourceType()) == SourceType.KAFKA) {
+                    pulsarSource.setPrimaryKey(((KafkaSource) sourceInfo).getPrimaryKey());
+                }
+            }
+
+            // if the SerializationType is still null, set it to the CSV
+            if (StringUtils.isEmpty(pulsarSource.getSerializationType())) {
+                pulsarSource.setSerializationType(DataTypeEnum.CSV.getName());
+            }
+            pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
+            pulsarSource.setFieldList(streamInfo.getFieldList());
+            sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
+        });
+
+        return sourceMap;
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
index 8bb953697..6e14acdac 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/tubemq/TubeMQSourceOperator.java
@@ -19,22 +19,32 @@
 package org.apache.inlong.manager.service.source.tubemq;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.pojo.source.tubemq.TubeMQSource;
 import org.apache.inlong.manager.common.pojo.source.tubemq.TubeMQSourceDTO;
 import org.apache.inlong.manager.common.pojo.source.tubemq.TubeMQSourceRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.source.AbstractSourceOperator;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 /**
  * TubeMQ source operator
@@ -44,6 +54,18 @@ public class TubeMQSourceOperator extends AbstractSourceOperator {
 
     @Autowired
     private ObjectMapper objectMapper;
+    @Autowired
+    private InlongClusterService clusterService;
+
+    @Override
+    public Boolean accept(SourceType sourceType) {
+        return SourceType.TUBEMQ == sourceType;
+    }
+
+    @Override
+    protected String getSourceType() {
+        return SourceType.TUBEMQ.getType();
+    }
 
     @Override
     protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
@@ -57,16 +79,6 @@ public class TubeMQSourceOperator extends AbstractSourceOperator {
         }
     }
 
-    @Override
-    protected String getSourceType() {
-        return SourceType.TUBEMQ.getType();
-    }
-
-    @Override
-    public Boolean accept(SourceType sourceType) {
-        return SourceType.TUBEMQ == sourceType;
-    }
-
     @Override
     public StreamSource getFromEntity(StreamSourceEntity entity) {
         TubeMQSource source = new TubeMQSource();
@@ -81,4 +93,32 @@ public class TubeMQSourceOperator extends AbstractSourceOperator {
         return source;
     }
 
+    @Override
+    public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
+            List<InlongStreamInfo> streamInfos, List<StreamSource> streamSources) {
+        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.TUBE);
+        TubeClusterInfo tubeClusterInfo = (TubeClusterInfo) clusterInfo;
+        String masterRpc = tubeClusterInfo.getUrl();
+
+        Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
+        streamInfos.forEach(streamInfo -> {
+            TubeMQSource tubeMQSource = new TubeMQSource();
+            String streamId = streamInfo.getInlongStreamId();
+            tubeMQSource.setSourceName(streamId);
+            tubeMQSource.setTopic(streamInfo.getMqResource());
+            tubeMQSource.setGroupId(streamId);
+            tubeMQSource.setMasterRpc(masterRpc);
+            for (StreamSource sourceInfo : streamSources) {
+                if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) {
+                    continue;
+                }
+                tubeMQSource.setSerializationType(sourceInfo.getSerializationType());
+            }
+            tubeMQSource.setFieldList(streamInfo.getFieldList());
+            sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(tubeMQSource);
+        });
+
+        return sourceMap;
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
index d84d7bed0..650bef608 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/listener/GroupTaskListenerFactory.java
@@ -31,8 +31,6 @@ import org.apache.inlong.manager.service.mq.TubeEventSelector;
 import org.apache.inlong.manager.service.resource.SinkResourceListener;
 import org.apache.inlong.manager.service.sort.SortConfigListener;
 import org.apache.inlong.manager.service.sort.ZookeeperDisabledSelector;
-import org.apache.inlong.manager.service.sort.light.LightGroupSortListener;
-import org.apache.inlong.manager.service.sort.light.LightGroupSortSelector;
 import org.apache.inlong.manager.service.source.listener.SourceDeleteEventSelector;
 import org.apache.inlong.manager.service.source.listener.SourceDeleteListener;
 import org.apache.inlong.manager.service.source.listener.SourceRestartEventSelector;
@@ -93,13 +91,9 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
 
     @Autowired
     private SinkResourceListener sinkResourceListener;
-
     @Autowired
     private SortConfigListener sortConfigListener;
 
-    @Autowired
-    private LightGroupSortListener lightGroupSortListener;
-
     @PostConstruct
     public void init() {
         sourceOperateListeners = new LinkedHashMap<>();
@@ -114,7 +108,6 @@ public class GroupTaskListenerFactory implements PluginBinder, ServiceTaskListen
         queueOperateListeners.put(deletePulsarResourceTaskListener, new PulsarResourceDeleteSelector());
         sortOperateListeners = new LinkedHashMap<>();
         sortOperateListeners.put(sortConfigListener, new ZookeeperDisabledSelector());
-        sortOperateListeners.put(lightGroupSortListener, new LightGroupSortSelector());
     }
 
     public void clearListeners() {