You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 05:03:27 UTC

[inlong] 06/08: [INLONG-5694][Manager] Fix the problem that gets the inlong group error (#5695)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit b6151c6827ec0198b8bcf97dfc93e2d0cccbda4a
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Fri Aug 26 12:51:20 2022 +0800

    [INLONG-5694][Manager] Fix the problem that gets the inlong group error (#5695)
---
 .../manager/plugin/listener/DeleteSortListener.java     | 12 ++++++------
 .../manager/plugin/listener/DeleteStreamListener.java   | 12 ++++++------
 .../manager/plugin/listener/RestartSortListener.java    | 14 +++++++-------
 .../manager/plugin/listener/RestartStreamListener.java  | 14 +++++++-------
 .../manager/plugin/listener/StartupStreamListener.java  | 14 +++++++-------
 .../manager/plugin/listener/SuspendSortListener.java    | 12 ++++++------
 .../manager/plugin/listener/SuspendStreamListener.java  | 12 ++++++------
 .../service/core/impl/ConsumptionServiceImpl.java       |  8 ++++----
 .../service/core/impl/SortSourceServiceImpl.java        | 17 +++++++++--------
 .../manager/service/group/InlongGroupServiceImpl.java   |  5 +++--
 .../workflow/core/impl/WorkflowQueryServiceImpl.java    |  5 +++--
 11 files changed, 64 insertions(+), 61 deletions(-)

diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index f2dd23537..227674f6e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -23,21 +23,21 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -88,8 +88,8 @@ public class DeleteSortListener implements SortOperateListener {
         List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
         log.info("inlong group ext info: {}", extList);
 
-        Map<String, String> kvConf = extList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index d54c6aa56..96c856781 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -23,23 +23,23 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -89,8 +89,8 @@ public class DeleteStreamListener implements SortOperateListener {
         log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index fc4ef876c..2b2218b52 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -23,22 +23,22 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.plugin.flink.FlinkOperation;
 import org.apache.inlong.manager.plugin.flink.FlinkService;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -89,8 +89,8 @@ public class RestartSortListener implements SortOperateListener {
         List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
         log.info("inlong group ext info: {}", extList);
 
-        Map<String, String> kvConf = extList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index cdf570e5e..ab94de669 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -23,24 +23,24 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -90,8 +90,8 @@ public class RestartStreamListener implements SortOperateListener {
         log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.stream().forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 9c37449f2..1ee9deffb 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -24,24 +24,24 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -97,8 +97,8 @@ public class StartupStreamListener implements SortOperateListener {
             return ListenerResult.success();
         }
 
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 88428c118..59eb35e32 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -23,21 +23,21 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -88,8 +88,8 @@ public class SuspendSortListener implements SortOperateListener {
         List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
         log.info("inlong group ext info: {}", extList);
 
-        Map<String, String> kvConf = extList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
         if (StringUtils.isEmpty(sortExt)) {
             String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty",
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index c3b2e21ad..f1e13ddf6 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -23,23 +23,23 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
 
@@ -89,8 +89,8 @@ public class SuspendStreamListener implements SortOperateListener {
         log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        Map<String, String> kvConf = groupExtList.stream().collect(
-                Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> kvConf = new HashMap<>();
+        groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
         streamExtList.stream().forEach(extInfo -> {
             kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
         });
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index e7a1c9e12..8a899a390 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -36,7 +36,6 @@ import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
 import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
-import org.apache.inlong.manager.pojo.common.CountInfo;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
@@ -62,6 +61,7 @@ import org.springframework.util.CollectionUtils;
 
 import java.util.Arrays;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
@@ -98,9 +98,9 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
     @Override
     public ConsumptionSummary getSummary(ConsumptionQuery query) {
-        Map<String, Integer> countMap = consumptionMapper.countByQuery(query)
-                .stream()
-                .collect(Collectors.toMap(CountInfo::getKey, CountInfo::getValue));
+        Map<String, Integer> countMap = new HashMap<>();
+        consumptionMapper.countByQuery(query)
+                .forEach(countInfo -> countMap.put(countInfo.getKey(), countInfo.getValue()));
 
         return ConsumptionSummary.builder()
                 .totalCount(countMap.values().stream().mapToInt(c -> c).sum())
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index dc51b6e9f..0510ce2f1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -42,6 +42,7 @@ import org.springframework.transaction.annotation.Transactional;
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -235,11 +236,11 @@ public class SortSourceServiceImpl implements SortSourceService {
 
             task2Group.forEach((task, groupList) -> {
                 // get topic properties under this cluster and task, group them by group id.
-                Map<String, Map<String, String>> group2topicProp = allStreamInfos.stream()
-                        .filter(stream -> stream.getSortTaskName().equals(task)
-                                && stream.getSortClusterName().equals(clusterName))
-                        .collect(Collectors.toMap(SortSourceStreamInfo::getGroupId,
-                                SortSourceStreamInfo::getExtParamsMap));
+                Map<String, Map<String, String>> group2topicProp = new HashMap<>();
+                allStreamInfos.stream().filter(stream -> stream.getSortTaskName().equals(task)
+                        && stream.getSortClusterName().equals(clusterName)).forEach(
+                        sortSourceStreamInfo -> group2topicProp.put(sortSourceStreamInfo.getGroupId(),
+                                sortSourceStreamInfo.getExtParamsMap()));
 
                 Map<String, CacheZone> cacheZones;
                 try {
@@ -317,9 +318,9 @@ public class SortSourceServiceImpl implements SortSourceService {
         List<String> tags = new ArrayList<>(tag2GroupInfos.keySet());
 
         // Clusters that related to these tags
-        Map<String, List<SortSourceClusterInfo>> tag2ClusterInfos = allTag2ClusterInfos.entrySet().stream()
-                .filter(entry -> tag2GroupInfos.containsKey(entry.getKey()))
-                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        Map<String, List<SortSourceClusterInfo>> tag2ClusterInfos = new HashMap<>();
+        allTag2ClusterInfos.entrySet().stream().filter(entry -> tag2GroupInfos.containsKey(entry.getKey()))
+                .forEach(entry -> tag2ClusterInfos.put(entry.getKey(), entry.getValue()));
 
         // get CacheZone list
         return tags.stream()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 65aecd2e3..d9c5f1c63 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -70,6 +70,7 @@ import org.springframework.validation.annotation.Validated;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -422,8 +423,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     }
 
     private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
-        Map<String, String> extMap = extInfos.stream()
-                .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+        Map<String, String> extMap = new HashMap<>();
+        extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), extInfo.getKeyValue()));
         String type = extMap.get(InlongConstants.SORT_TYPE);
         if (StringUtils.isBlank(type)) {
             return null;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
index b34bc8e83..5eac67e94 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
@@ -62,6 +62,7 @@ import org.springframework.stereotype.Service;
 
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -118,8 +119,8 @@ public class WorkflowQueryServiceImpl implements WorkflowQueryService {
     public ProcessCountResponse countProcess(ProcessCountRequest request) {
         List<CountInfo> result = processEntityMapper.countByQuery(request);
 
-        Map<String, Integer> countByState = result.stream()
-                .collect(Collectors.toMap(CountInfo::getKey, CountInfo::getValue));
+        Map<String, Integer> countByState = new HashMap<>();
+        result.forEach(countInfo -> countByState.put(countInfo.getKey(), countInfo.getValue()));
 
         return ProcessCountResponse.builder()
                 .totalApplyCount(countByState.values().stream().mapToInt(c -> c).sum())