You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/03/26 08:31:21 UTC

[incubator-inlong] branch master updated: [INLONG-3369] Add streamsource in group/list Api (#3374)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 305d458  [INLONG-3369] Add streamsource in group/list Api (#3374)
305d458 is described below

commit 305d458b7647066cfc0b736d7c043667cdd4373b
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat Mar 26 16:31:15 2022 +0800

    [INLONG-3369] Add streamsource in group/list Api (#3374)
---
 .../inlong/common/enums/ComponentTypeEnum.java     | 38 +++++++++++
 .../apache/inlong/dataproxy/sink/PulsarSink.java   |  3 +-
 .../inlong/manager/client/api/InlongClient.java    | 13 +++-
 .../manager/client/api/impl/InlongClientImpl.java  | 73 ++++++++++++++++++++--
 .../common/pojo/group/InlongGroupListResponse.java |  5 ++
 .../common/pojo/group/InlongGroupPageRequest.java  |  2 +
 .../dao/mapper/StreamSourceEntityMapper.java       |  5 ++
 .../resources/mappers/StreamSourceEntityMapper.xml | 12 ++++
 .../service/core/impl/AgentServiceImpl.java        | 20 +++++-
 .../service/core/impl/InlongGroupServiceImpl.java  | 31 +++++++++
 .../controller/openapi/DBCollectorController.java  |  1 +
 11 files changed, 195 insertions(+), 8 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
new file mode 100644
index 0000000..3361ea9
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+import lombok.Getter;
+
+public enum ComponentTypeEnum {
+
+    Agent("Inlong-Agent"),
+
+    DataProxy("Inlong-DataProxy"),
+
+    Cache("Inlong-Cache"),
+
+    Sort("Inlong-Sort");
+
+    @Getter
+    private String name;
+
+    ComponentTypeEnum(String name) {
+        this.name = name;
+    }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index db81416..8c0e844 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -34,6 +34,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.common.monitor.LogCounter;
 import org.apache.inlong.common.monitor.MonitorIndex;
@@ -233,7 +234,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
                     .getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_INTERVAL, "60000");
             String clientVersion = commonProperties
                     .getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_CLIENT_VERSION, "");
-            streamConfigLogMetric = new StreamConfigLogMetric(ConfigConstants.COMPONENT_NAME,
+            streamConfigLogMetric = new StreamConfigLogMetric(ComponentTypeEnum.DataProxy.getName(),
                     reportConfigServerUrl, Long.parseLong(reportConfigLogInterval),
                     localIp, clientVersion);
             pulsarClientService.setConfigLogMetric(streamConfigLogMetric);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index 472ce7a..2797b2a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -18,12 +18,14 @@
 package org.apache.inlong.manager.client.api;
 
 import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.client.api.InlongGroupContext.InlongGroupState;
 import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * An interface to manipulate Inlong Cluster
@@ -78,17 +80,26 @@ public interface InlongClient {
      */
     List<InlongGroup> listGroup(String expr, int status, int pageNum, int pageSize) throws Exception;
 
-
     /**
      * List group
      *
      * @param request The request
      * @return PageInfo of group
+     *
      * @throws Exception The exception may throws
      */
     Response<PageInfo<InlongGroupListResponse>> listGroup(InlongGroupPageRequest request) throws Exception;
 
     /**
+     * List group state
+     *
+     * @param groupNames
+     * @return
+     * @throws Exception
+     */
+    Map<String, InlongGroupState> listGroupState(List<String> groupNames) throws Exception;
+
+    /**
      * Gets group.
      *
      * @param groupName the group name
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index ac44032..10bf412 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.client.api.impl;
 import com.github.pagehelper.PageInfo;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
@@ -28,12 +29,15 @@ import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupConf;
+import org.apache.inlong.manager.client.api.InlongGroupContext.InlongGroupState;
+import org.apache.inlong.manager.client.api.StreamSource.State;
 import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
 import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -97,16 +101,41 @@ public class InlongClientImpl implements InlongClient {
         }
     }
 
+    @Override
+    public Response<PageInfo<InlongGroupListResponse>> listGroup(InlongGroupPageRequest request) throws Exception {
+        InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
+        return managerClient.listGroups(request);
+    }
+
     /**
-     * List group
+     * List group state
      *
-     * @param request The request
-     * @return PageInfo of group
+     * @param groupNames
+     * @return
+     * @throws Exception
      */
     @Override
-    public Response<PageInfo<InlongGroupListResponse>> listGroup(InlongGroupPageRequest request) throws Exception {
+    public Map<String, InlongGroupState> listGroupState(List<String> groupNames) throws Exception {
         InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
-        return managerClient.listGroups(request);
+        InlongGroupPageRequest request = new InlongGroupPageRequest();
+        request.setNameList(groupNames);
+        request.setPageNum(1);
+        request.setPageSize(groupNames.size());
+        request.setListSources(true);
+        Response<PageInfo<InlongGroupListResponse>> pageInfoResponse = managerClient.listGroups(request);
+        if (!pageInfoResponse.isSuccess() || pageInfoResponse.getErrMsg() != null) {
+            throw new RuntimeException("listGroupStateFailed:" + pageInfoResponse.getErrMsg());
+        }
+        List<InlongGroupListResponse> groupListResponses = pageInfoResponse.getData().getList();
+        Map<String, InlongGroupState> groupStateMap = Maps.newHashMap();
+        groupListResponses.stream().forEach(groupListResponse -> {
+            String groupId = groupListResponse.getInlongGroupId();
+            InlongGroupState groupState = InlongGroupState.parseByBizStatus(groupListResponse.getStatus());
+            List<SourceListResponse> sourceListResponses = groupListResponse.getSourceListResponses();
+            groupState = recheckGroupState(groupState, sourceListResponses);
+            groupStateMap.put(groupId, groupState);
+        });
+        return groupStateMap;
     }
 
     @Override
@@ -138,4 +167,38 @@ public class InlongClientImpl implements InlongClient {
             }
         }
     }
+
+    private InlongGroupState recheckGroupState(InlongGroupState groupState,
+            List<SourceListResponse> sourceListResponses) {
+        Map<State, List<SourceListResponse>> stateListMap = Maps.newHashMap();
+        sourceListResponses.stream().forEach(sourceListResponse -> {
+            State state = State.parseByStatus(sourceListResponse.getStatus());
+            stateListMap.computeIfAbsent(state, k -> Lists.newArrayList()).add(sourceListResponse);
+        });
+        if (CollectionUtils.isNotEmpty(stateListMap.get(State.FAILED))) {
+            return InlongGroupState.FAILED;
+        }
+        switch (groupState) {
+            case STARTED:
+                if (CollectionUtils.isNotEmpty(stateListMap.get(State.INIT))) {
+                    return InlongGroupState.INITIALIZING;
+                } else {
+                    return groupState;
+                }
+            case STOPPED:
+                if (CollectionUtils.isNotEmpty(stateListMap.get(State.FROZING))) {
+                    return InlongGroupState.OPERATING;
+                } else {
+                    return groupState;
+                }
+            case DELETED:
+                if (CollectionUtils.isNotEmpty(stateListMap.get(State.DELETING))) {
+                    return InlongGroupState.OPERATING;
+                } else {
+                    return groupState;
+                }
+            default:
+                return groupState;
+        }
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupListResponse.java
index 51405bb..a9dd552 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupListResponse.java
@@ -21,8 +21,10 @@ import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 
 import java.util.Date;
+import java.util.List;
 
 /**
  * Inlong group list
@@ -55,4 +57,7 @@ public class InlongGroupListResponse {
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
     private Date modifyTime;
 
+    @ApiModelProperty(value = "StreamSources in group")
+    private List<SourceListResponse> sourceListResponses;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPageRequest.java
index 72a19b3..70c3862 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPageRequest.java
@@ -51,4 +51,6 @@ public class InlongGroupPageRequest extends PageRequest {
     @ApiModelProperty(value = "Current user", hidden = true)
     private String currentUser;
 
+    @ApiModelProperty(value = "If list streamSource for group", hidden = true)
+    private boolean listSources = false;
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 9603275..1cd75af 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -68,6 +68,11 @@ public interface StreamSourceEntityMapper {
             @Param("agentIp") String agentIp, @Param("uuid") String uuid);
 
     /**
+     * Select all sources by groupIds
+     */
+    List<StreamSourceEntity> selectByGroupIds(@Param("groupIds") List<String> groupIds);
+
+    /**
      * Get the distinct source type from the given groupId and streamId
      */
     List<String> selectSourceType(@Param("groupId") String groupId, @Param("streamId") String streamId);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 963e4bb..fddab40 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -174,6 +174,18 @@
             </if>
         </where>
     </select>
+    <select id="selectByGroupIds" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from stream_source
+        <where>
+            inlong_group_id in
+            <foreach item="item" index="index" collection="groupIds" open="(" close=")" separator=",">
+                #{item}
+            </foreach>
+            and is_deleted = 0
+        </where>
+    </select>
     <select id="selectSourceType" resultType="java.lang.String">
         select distinct (source_type)
         from stream_source
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 23215c1..0ae5b5a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.constant.Constants;
 import org.apache.inlong.common.db.CommandEntity;
+import org.apache.inlong.common.enums.ComponentTypeEnum;
 import org.apache.inlong.common.pojo.agent.CmdConfig;
 import org.apache.inlong.common.pojo.agent.DataConfig;
 import org.apache.inlong.common.pojo.agent.TaskRequest;
@@ -42,6 +43,7 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo.CommandInfoBean;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskConfig;
 import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
 import org.apache.inlong.manager.dao.entity.DataSourceCmdConfigEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
@@ -53,6 +55,7 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
 import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.service.core.AgentService;
+import org.apache.inlong.manager.service.core.StreamConfigLogService;
 import org.apache.inlong.manager.service.source.SourceSnapshotOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,6 +94,8 @@ public class AgentServiceImpl implements AgentService {
     private InlongStreamFieldEntityMapper streamFieldMapper;
     @Autowired
     private InlongStreamEntityMapper streamMapper;
+    @Autowired
+    private StreamConfigLogService streamConfigLogService;
 
     @Override
     public Boolean reportSnapshot(TaskSnapshotRequest request) {
@@ -135,7 +140,7 @@ public class AgentServiceImpl implements AgentService {
         int nextStatus = SourceState.SOURCE_NORMAL.getCode();
 
         if (Constants.RESULT_FAIL == result) {
-            // TODO Need to save failed reason
+            logFailedStreamSource(current);
             nextStatus = SourceState.SOURCE_FAILED.getCode();
         } else if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
             // Change the status from 30x to normal / disable / frozen
@@ -213,6 +218,19 @@ public class AgentServiceImpl implements AgentService {
         return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
     }
 
+    private void logFailedStreamSource(StreamSourceEntity entity) {
+        InlongStreamConfigLogRequest request = new InlongStreamConfigLogRequest();
+        request.setInlongGroupId(entity.getInlongGroupId());
+        request.setInlongStreamId(entity.getInlongStreamId());
+        request.setComponentName(ComponentTypeEnum.Agent.getName());
+        request.setIp(entity.getAgentIp());
+        request.setConfigName("DataSource:" + entity.getSourceName());
+        request.setLogType(1);
+        request.setLogInfo(String.format("StreamSource=%s init failed, please check!", entity));
+        request.setReportTime(new Date().getTime());
+        streamConfigLogService.reportConfigLog(request);
+    }
+
     /**
      * Get the DataConfig from the stream source entity
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index c5516ed..8f7dca9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -20,6 +20,8 @@ package org.apache.inlong.manager.service.core.impl;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
@@ -27,6 +29,7 @@ import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupCountResponse;
@@ -37,17 +40,22 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupTopicResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupPulsarEntity;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupPulsarEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.core.InlongGroupService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.source.SourceOperationFactory;
+import org.apache.inlong.manager.service.source.StreamSourceOperation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -61,6 +69,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Inlong group service layer implementation
@@ -76,6 +85,10 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     @Autowired
     private InlongGroupExtEntityMapper groupExtMapper;
     @Autowired
+    private StreamSourceEntityMapper streamSourceEntityMapper;
+    @Autowired
+    private SourceOperationFactory operationFactory;
+    @Autowired
     private CommonOperateService commonOperateService;
     @Autowired
     private InlongStreamService streamService;
@@ -206,6 +219,24 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         Page<InlongGroupEntity> entityPage = (Page<InlongGroupEntity>) groupMapper.selectByCondition(request);
         List<InlongGroupListResponse> groupList = CommonBeanUtils.copyListProperties(entityPage,
                 InlongGroupListResponse::new);
+        if (request.isListSources() && CollectionUtils.isNotEmpty(groupList)) {
+            List<String> groupIds = groupList.stream().map(response -> response.getInlongGroupId())
+                    .collect(Collectors.toList());
+            List<StreamSourceEntity> sourceEntities = streamSourceEntityMapper.selectByGroupIds(groupIds);
+            Map<String, List<SourceListResponse>> sourceMap = Maps.newHashMap();
+            sourceEntities.stream().forEach(sourceEntity -> {
+                SourceType sourceType = SourceType.forType(sourceEntity.getSourceType());
+                StreamSourceOperation operation = operationFactory.getInstance(sourceType);
+                SourceListResponse sourceListResponse = operation.getFromEntity(sourceEntity, SourceListResponse::new);
+                sourceMap.computeIfAbsent(sourceEntity.getInlongGroupId(), k -> Lists.newArrayList())
+                        .add(sourceListResponse);
+            });
+            groupList.stream().forEach(group -> {
+                List<SourceListResponse> sourceListResponses = sourceMap.getOrDefault(group.getInlongGroupId(),
+                        Lists.newArrayList());
+                group.setSourceListResponses(sourceListResponses);
+            });
+        }
         // Encapsulate the paging query results into the PageInfo object to obtain related paging information
         PageInfo<InlongGroupListResponse> page = new PageInfo<>(groupList);
         page.setTotal(entityPage.getTotal());
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DBCollectorController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DBCollectorController.java
index 88b9dfb..0053a50 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DBCollectorController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DBCollectorController.java
@@ -33,6 +33,7 @@ import org.springframework.web.bind.annotation.RestController;
 @RestController
 @RequestMapping("/openapi/dbCollector")
 @Api(tags = "DBCollector Config")
+@Deprecated
 public class DBCollectorController {
 
     @Autowired