You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "fuweng11 (via GitHub)" <gi...@apache.org> on 2023/03/13 07:17:04 UTC

[GitHub] [inlong] fuweng11 commented on a diff in pull request #7578: [INLONG-7577][Manager] Fix high CPU usage when the number of StreamSource is too large

fuweng11 commented on code in PR #7578:
URL: https://github.com/apache/inlong/pull/7578#discussion_r1133521905


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java:
##########
@@ -338,31 +334,25 @@ private void preProcessTemplateFileTask(TaskRequest taskRequest) {
         // find those node whose tag match stream_source tag and agent ip match stream_source agent ip
         List<StreamSourceEntity> sourceEntities = sourceMapper.selectTemplateSourceByCluster(needCopiedStatusList,
                 Lists.newArrayList(SourceType.FILE), agentClusterName);
-        sourceEntities.stream()
-                .filter(sourceEntity -> sourceEntity.getTemplateId() == null) // only apply template task
-                .map(sourceEntity -> {
-                    List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId());
-                    Optional<StreamSourceEntity> optionalSource = subSources.stream()
-                            .filter(subSource -> subSource.getAgentIp().equals(agentIp))
-                            .findAny();
-                    return Pair.<StreamSourceEntity, Optional>of(sourceEntity, optionalSource);
-                }).filter(parAndSonEntity -> !parAndSonEntity.getValue().isPresent()) // haven't cloned subtask
-                .forEach(parAndSonEntity -> {
-                    // if not, clone a subtask for this Agent.
-                    // note: a new source name with random suffix is generated to adhere to the unique constraint
-                    StreamSourceEntity sourceEntity = parAndSonEntity.getKey();
-                    StreamSourceEntity fileEntity =
-                            CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
-                    fileEntity.setSourceName(fileEntity.getSourceName() + "-"
-                            + RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
-                    fileEntity.setTemplateId(sourceEntity.getId());
-                    fileEntity.setAgentIp(agentIp);
-                    fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
-                    // create new sub source task
-                    sourceMapper.insert(fileEntity);
-                    LOGGER.info("Transform new template task({}) for agent({}) in cluster({}).",
-                            fileEntity.getId(), taskRequest.getAgentIp(), taskRequest.getClusterName());
-                });
+        sourceEntities.forEach(sourceEntity -> {
+            StreamSourceEntity subSource = sourceMapper.selectOneByTemplatedIdAndAgentIp(sourceEntity.getId(),

Review Comment:
   The previous query was to pull out all Streamsources in the templateId and filter them.
   If there were 200 agents, this method would generate 500 + streamSource instances each time, leading to a large amount of gc.
   Filtering is now done directly in the mysql layer to avoid generating too many instances and causing gc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org