You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by "healchow (via GitHub)" <gi...@apache.org> on 2023/03/13 06:31:06 UTC

[GitHub] [inlong] healchow commented on a diff in pull request #7578: [INLONG-7577][Manager] Fix high CPU usage when the number of StreamSource is too large

healchow commented on code in PR #7578:
URL: https://github.com/apache/inlong/pull/7578#discussion_r1133487231


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java:
##########
@@ -338,31 +334,25 @@ private void preProcessTemplateFileTask(TaskRequest taskRequest) {
         // find those node whose tag match stream_source tag and agent ip match stream_source agent ip
         List<StreamSourceEntity> sourceEntities = sourceMapper.selectTemplateSourceByCluster(needCopiedStatusList,
                 Lists.newArrayList(SourceType.FILE), agentClusterName);
-        sourceEntities.stream()
-                .filter(sourceEntity -> sourceEntity.getTemplateId() == null) // only apply template task
-                .map(sourceEntity -> {
-                    List<StreamSourceEntity> subSources = sourceMapper.selectByTemplateId(sourceEntity.getId());
-                    Optional<StreamSourceEntity> optionalSource = subSources.stream()
-                            .filter(subSource -> subSource.getAgentIp().equals(agentIp))
-                            .findAny();
-                    return Pair.<StreamSourceEntity, Optional>of(sourceEntity, optionalSource);
-                }).filter(parAndSonEntity -> !parAndSonEntity.getValue().isPresent()) // haven't cloned subtask
-                .forEach(parAndSonEntity -> {
-                    // if not, clone a subtask for this Agent.
-                    // note: a new source name with random suffix is generated to adhere to the unique constraint
-                    StreamSourceEntity sourceEntity = parAndSonEntity.getKey();
-                    StreamSourceEntity fileEntity =
-                            CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);
-                    fileEntity.setSourceName(fileEntity.getSourceName() + "-"
-                            + RandomStringUtils.randomAlphanumeric(10).toLowerCase(Locale.ROOT));
-                    fileEntity.setTemplateId(sourceEntity.getId());
-                    fileEntity.setAgentIp(agentIp);
-                    fileEntity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
-                    // create new sub source task
-                    sourceMapper.insert(fileEntity);
-                    LOGGER.info("Transform new template task({}) for agent({}) in cluster({}).",
-                            fileEntity.getId(), taskRequest.getAgentIp(), taskRequest.getClusterName());
-                });
+        sourceEntities.forEach(sourceEntity -> {
+            StreamSourceEntity subSource = sourceMapper.selectOneByTemplatedIdAndAgentIp(sourceEntity.getId(),

Review Comment:
   Are you sure it's the code here that is causing the high CPU usage?
   
   Only one query is made here, and the amount of data is too large, which should lead to excessive memory usage, and then trigger the GC process of the JVM.
   
   After changing to query one by one, you can solve the problem of too many query results, but it will cause the method to cycle too many times, frequently query the DB, put too much pressure on the DB, and take too long time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org