You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2022/03/26 08:31:21 UTC
[incubator-inlong] branch master updated: [INLONG-3369] Add streamsource in group/list Api (#3374)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 305d458 [INLONG-3369] Add streamsource in group/list Api (#3374)
305d458 is described below
commit 305d458b7647066cfc0b736d7c043667cdd4373b
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat Mar 26 16:31:15 2022 +0800
[INLONG-3369] Add streamsource in group/list Api (#3374)
---
.../inlong/common/enums/ComponentTypeEnum.java | 38 +++++++++++
.../apache/inlong/dataproxy/sink/PulsarSink.java | 3 +-
.../inlong/manager/client/api/InlongClient.java | 13 +++-
.../manager/client/api/impl/InlongClientImpl.java | 73 ++++++++++++++++++++--
.../common/pojo/group/InlongGroupListResponse.java | 5 ++
.../common/pojo/group/InlongGroupPageRequest.java | 2 +
.../dao/mapper/StreamSourceEntityMapper.java | 5 ++
.../resources/mappers/StreamSourceEntityMapper.xml | 12 ++++
.../service/core/impl/AgentServiceImpl.java | 20 +++++-
.../service/core/impl/InlongGroupServiceImpl.java | 31 +++++++++
.../controller/openapi/DBCollectorController.java | 1 +
11 files changed, 195 insertions(+), 8 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
new file mode 100644
index 0000000..3361ea9
--- /dev/null
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/ComponentTypeEnum.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.common.enums;
+
+import lombok.Getter;
+
+public enum ComponentTypeEnum {
+
+ Agent("Inlong-Agent"),
+
+ DataProxy("Inlong-DataProxy"),
+
+ Cache("Inlong-Cache"),
+
+ Sort("Inlong-Sort");
+
+ @Getter
+ private String name;
+
+ ComponentTypeEnum(String name) {
+ this.name = name;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index db81416..8c0e844 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -34,6 +34,7 @@ import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.monitor.MonitorIndex;
@@ -233,7 +234,7 @@ public class PulsarSink extends AbstractSink implements Configurable,
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_INTERVAL, "60000");
String clientVersion = commonProperties
.getOrDefault(StreamConfigLogMetric.CONFIG_LOG_REPORT_CLIENT_VERSION, "");
- streamConfigLogMetric = new StreamConfigLogMetric(ConfigConstants.COMPONENT_NAME,
+ streamConfigLogMetric = new StreamConfigLogMetric(ComponentTypeEnum.DataProxy.getName(),
reportConfigServerUrl, Long.parseLong(reportConfigLogInterval),
localIp, clientVersion);
pulsarClientService.setConfigLogMetric(streamConfigLogMetric);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index 472ce7a..2797b2a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -18,12 +18,14 @@
package org.apache.inlong.manager.client.api;
import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.client.api.InlongGroupContext.InlongGroupState;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import java.util.List;
+import java.util.Map;
/**
* An interface to manipulate Inlong Cluster
@@ -78,17 +80,26 @@ public interface InlongClient {
*/
List<InlongGroup> listGroup(String expr, int status, int pageNum, int pageSize) throws Exception;
-
/**
* List group
*
* @param request The request
* @return PageInfo of group
+ *
* @throws Exception The exception may throws
*/
Response<PageInfo<InlongGroupListResponse>> listGroup(InlongGroupPageRequest request) throws Exception;
/**
+ * List group state
+ *
+ * @param groupNames
+ * @return
+ * @throws Exception
+ */
+ Map<String, InlongGroupState> listGroupState(List<String> groupNames) throws Exception;
+
+ /**
* Gets group.
*
* @param groupName the group name
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index ac44032..10bf412 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.client.api.impl;
import com.github.pagehelper.PageInfo;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
@@ -28,12 +29,15 @@ import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupConf;
+import org.apache.inlong.manager.client.api.InlongGroupContext.InlongGroupState;
+import org.apache.inlong.manager.client.api.StreamSource.State;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -97,16 +101,41 @@ public class InlongClientImpl implements InlongClient {
}
}
+ @Override
+ public Response<PageInfo<InlongGroupListResponse>> listGroup(InlongGroupPageRequest request) throws Exception {
+ InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
+ return managerClient.listGroups(request);
+ }
+
/**
- * List group
+ * List group state
*
- * @param request The request
- * @return PageInfo of group
+ * @param groupNames
+ * @return
+ * @throws Exception
*/
@Override
- public Response<PageInfo<InlongGroupListResponse>> listGroup(InlongGroupPageRequest request) throws Exception {
+ public Map<String, InlongGroupState> listGroupState(List<String> groupNames) throws Exception {
InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
- return managerClient.listGroups(request);
+ InlongGroupPageRequest request = new InlongGroupPageRequest();
+ request.setNameList(groupNames);
+ request.setPageNum(1);
+ request.setPageSize(groupNames.size());
+ request.setListSources(true);
+ Response<PageInfo<InlongGroupListResponse>> pageInfoResponse = managerClient.listGroups(request);
+ if (!pageInfoResponse.isSuccess() || pageInfoResponse.getErrMsg() != null) {
+ throw new RuntimeException("listGroupStateFailed:" + pageInfoResponse.getErrMsg());
+ }
+ List<InlongGroupListResponse> groupListResponses = pageInfoResponse.getData().getList();
+ Map<String, InlongGroupState> groupStateMap = Maps.newHashMap();
+ groupListResponses.stream().forEach(groupListResponse -> {
+ String groupId = groupListResponse.getInlongGroupId();
+ InlongGroupState groupState = InlongGroupState.parseByBizStatus(groupListResponse.getStatus());
+ List<SourceListResponse> sourceListResponses = groupListResponse.getSourceListResponses();
+ groupState = recheckGroupState(groupState, sourceListResponses);
+ groupStateMap.put(groupId, groupState);
+ });
+ return groupStateMap;
}
@Override
@@ -138,4 +167,38 @@ public class InlongClientImpl implements InlongClient {
}
}
}
+
+ private InlongGroupState recheckGroupState(InlongGroupState groupState,
+ List<SourceListResponse> sourceListResponses) {
+ Map<State, List<SourceListResponse>> stateListMap = Maps.newHashMap();
+ sourceListResponses.stream().forEach(sourceListResponse -> {
+ State state = State.parseByStatus(sourceListResponse.getStatus());
+ stateListMap.computeIfAbsent(state, k -> Lists.newArrayList()).add(sourceListResponse);
+ });
+ if (CollectionUtils.isNotEmpty(stateListMap.get(State.FAILED))) {
+ return InlongGroupState.FAILED;
+ }
+ switch (groupState) {
+ case STARTED:
+ if (CollectionUtils.isNotEmpty(stateListMap.get(State.INIT))) {
+ return InlongGroupState.INITIALIZING;
+ } else {
+ return groupState;
+ }
+ case STOPPED:
+ if (CollectionUtils.isNotEmpty(stateListMap.get(State.FROZING))) {
+ return InlongGroupState.OPERATING;
+ } else {
+ return groupState;
+ }
+ case DELETED:
+ if (CollectionUtils.isNotEmpty(stateListMap.get(State.DELETING))) {
+ return InlongGroupState.OPERATING;
+ } else {
+ return groupState;
+ }
+ default:
+ return groupState;
+ }
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupListResponse.java
index 51405bb..a9dd552 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupListResponse.java
@@ -21,8 +21,10 @@ import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import java.util.Date;
+import java.util.List;
/**
* Inlong group list
@@ -55,4 +57,7 @@ public class InlongGroupListResponse {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date modifyTime;
+ @ApiModelProperty(value = "StreamSources in group")
+ private List<SourceListResponse> sourceListResponses;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPageRequest.java
index 72a19b3..70c3862 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPageRequest.java
@@ -51,4 +51,6 @@ public class InlongGroupPageRequest extends PageRequest {
@ApiModelProperty(value = "Current user", hidden = true)
private String currentUser;
+ @ApiModelProperty(value = "If list streamSource for group", hidden = true)
+ private boolean listSources = false;
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 9603275..1cd75af 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -68,6 +68,11 @@ public interface StreamSourceEntityMapper {
@Param("agentIp") String agentIp, @Param("uuid") String uuid);
/**
+ * Select all sources by groupIds
+ */
+ List<StreamSourceEntity> selectByGroupIds(@Param("groupIds") List<String> groupIds);
+
+ /**
* Get the distinct source type from the given groupId and streamId
*/
List<String> selectSourceType(@Param("groupId") String groupId, @Param("streamId") String streamId);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index 963e4bb..fddab40 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -174,6 +174,18 @@
</if>
</where>
</select>
+ <select id="selectByGroupIds" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source
+ <where>
+ inlong_group_id in
+ <foreach item="item" index="index" collection="groupIds" open="(" close=")" separator=",">
+ #{item}
+ </foreach>
+ and is_deleted = 0
+ </where>
+ </select>
<select id="selectSourceType" resultType="java.lang.String">
select distinct (source_type)
from stream_source
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 23215c1..0ae5b5a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.common.db.CommandEntity;
+import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
@@ -42,6 +43,7 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo;
import org.apache.inlong.manager.common.pojo.agent.FileAgentCommandInfo.CommandInfoBean;
import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskConfig;
import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
import org.apache.inlong.manager.dao.entity.DataSourceCmdConfigEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
@@ -53,6 +55,7 @@ import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.service.core.AgentService;
+import org.apache.inlong.manager.service.core.StreamConfigLogService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,6 +94,8 @@ public class AgentServiceImpl implements AgentService {
private InlongStreamFieldEntityMapper streamFieldMapper;
@Autowired
private InlongStreamEntityMapper streamMapper;
+ @Autowired
+ private StreamConfigLogService streamConfigLogService;
@Override
public Boolean reportSnapshot(TaskSnapshotRequest request) {
@@ -135,7 +140,7 @@ public class AgentServiceImpl implements AgentService {
int nextStatus = SourceState.SOURCE_NORMAL.getCode();
if (Constants.RESULT_FAIL == result) {
- // TODO Need to save failed reason
+ logFailedStreamSource(current);
nextStatus = SourceState.SOURCE_FAILED.getCode();
} else if (previousStatus / MODULUS_100 == ISSUED_STATUS) {
// Change the status from 30x to normal / disable / frozen
@@ -213,6 +218,19 @@ public class AgentServiceImpl implements AgentService {
return TaskResult.builder().dataConfigs(dataConfigs).cmdConfigs(cmdConfigs).build();
}
+ private void logFailedStreamSource(StreamSourceEntity entity) {
+ InlongStreamConfigLogRequest request = new InlongStreamConfigLogRequest();
+ request.setInlongGroupId(entity.getInlongGroupId());
+ request.setInlongStreamId(entity.getInlongStreamId());
+ request.setComponentName(ComponentTypeEnum.Agent.getName());
+ request.setIp(entity.getAgentIp());
+ request.setConfigName("DataSource:" + entity.getSourceName());
+ request.setLogType(1);
+ request.setLogInfo(String.format("StreamSource=%s init failed, please check!", entity));
+ request.setReportTime(new Date().getTime());
+ streamConfigLogService.reportConfigLog(request);
+ }
+
/**
* Get the DataConfig from the stream source entity
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index c5516ed..8f7dca9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -20,6 +20,8 @@ package org.apache.inlong.manager.service.core.impl;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
@@ -27,6 +29,7 @@ import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupCountResponse;
@@ -37,17 +40,22 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupTopicResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupPulsarEntity;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupPulsarEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.source.SourceOperationFactory;
+import org.apache.inlong.manager.service.source.StreamSourceOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -61,6 +69,7 @@ import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Inlong group service layer implementation
@@ -76,6 +85,10 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Autowired
private InlongGroupExtEntityMapper groupExtMapper;
@Autowired
+ private StreamSourceEntityMapper streamSourceEntityMapper;
+ @Autowired
+ private SourceOperationFactory operationFactory;
+ @Autowired
private CommonOperateService commonOperateService;
@Autowired
private InlongStreamService streamService;
@@ -206,6 +219,24 @@ public class InlongGroupServiceImpl implements InlongGroupService {
Page<InlongGroupEntity> entityPage = (Page<InlongGroupEntity>) groupMapper.selectByCondition(request);
List<InlongGroupListResponse> groupList = CommonBeanUtils.copyListProperties(entityPage,
InlongGroupListResponse::new);
+ if (request.isListSources() && CollectionUtils.isNotEmpty(groupList)) {
+ List<String> groupIds = groupList.stream().map(response -> response.getInlongGroupId())
+ .collect(Collectors.toList());
+ List<StreamSourceEntity> sourceEntities = streamSourceEntityMapper.selectByGroupIds(groupIds);
+ Map<String, List<SourceListResponse>> sourceMap = Maps.newHashMap();
+ sourceEntities.stream().forEach(sourceEntity -> {
+ SourceType sourceType = SourceType.forType(sourceEntity.getSourceType());
+ StreamSourceOperation operation = operationFactory.getInstance(sourceType);
+ SourceListResponse sourceListResponse = operation.getFromEntity(sourceEntity, SourceListResponse::new);
+ sourceMap.computeIfAbsent(sourceEntity.getInlongGroupId(), k -> Lists.newArrayList())
+ .add(sourceListResponse);
+ });
+ groupList.stream().forEach(group -> {
+ List<SourceListResponse> sourceListResponses = sourceMap.getOrDefault(group.getInlongGroupId(),
+ Lists.newArrayList());
+ group.setSourceListResponses(sourceListResponses);
+ });
+ }
// Encapsulate the paging query results into the PageInfo object to obtain related paging information
PageInfo<InlongGroupListResponse> page = new PageInfo<>(groupList);
page.setTotal(entityPage.getTotal());
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DBCollectorController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DBCollectorController.java
index 88b9dfb..0053a50 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DBCollectorController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/DBCollectorController.java
@@ -33,6 +33,7 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/openapi/dbCollector")
@Api(tags = "DBCollector Config")
+@Deprecated
public class DBCollectorController {
@Autowired