You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "gong (via GitHub)" <gi...@apache.org> on 2023/03/13 08:10:39 UTC

[GitHub] [inlong] gong commented on a diff in pull request #7578: [INLONG-7577][Manager] Fix high CPU usage when the number of StreamSource is too large

gong commented on code in PR #7578:
URL: https://github.com/apache/inlong/pull/7578#discussion_r1133563424


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java:
##########
@@ -338,30 +336,24 @@ private void preProcessTemplateFileTask(TaskRequest taskRequest) {
         // find those node whose tag match stream_source tag and agent ip match stream_source agent ip
         List<StreamSourceEntity> sourceEntities = sourceMapper.selectTemplateSourceByCluster(needCopiedStatusList,
                 Lists.newArrayList(SourceType.FILE), agentClusterName);
-        sourceEntities.stream()
-                .filter(sourceEntity -> sourceEntity.getTemplateId() == null) // only apply template task
-                .map(sourceEntity -> {
-                    List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId());
-                    Optional<StreamSourceEntity> optionalSource = subSources.stream()
-                            .filter(subSource -> subSource.getAgentIp().equals(agentIp))
-                            .findAny();
-                    return Pair.<StreamSourceEntity, Optional>of(sourceEntity, optionalSource);
-                }).filter(parAndSonEntity -> !parAndSonEntity.getValue().isPresent()) // haven't cloned subtask
-                .forEach(parAndSonEntity -> {
-                    // if not, clone a subtask for this Agent.
-                    // note: a new source name with random suffix is generated to adhere to the unique constraint
-                    StreamSourceEntity sourceEntity = parAndSonEntity.getKey();
-                    StreamSourceEntity fileEntity =
-                            CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
-                    fileEntity.setSourceName(fileEntity.getSourceName() + "-"
-                            + RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
-                    fileEntity.setTemplateId(sourceEntity.getId());
-                    fileEntity.setAgentIp(agentIp);
-                    fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
-                    // create new sub source task
-                    sourceMapper.insert(fileEntity);
-                    LOGGER.info("Transform new template task({}) for agent({}) in cluster({}).",
-                            fileEntity.getId(), taskRequest.getAgentIp(), taskRequest.getClusterName());
+        sourceEntities.forEach(sourceEntity -> {
+                    StreamSourceEntity subSource = sourceMapper.selectExistsByTemplateIdAndIp(sourceEntity.getId(),
+                            agentIp);

Review Comment:
   Maybe DB load will too large



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org