You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 04:51:25 UTC
[inlong] branch master updated: [INLONG-5694][Manager] Fix the problem that gets the inlong group error (#5695)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a133995d1 [INLONG-5694][Manager] Fix the problem that gets the inlong group error (#5695)
a133995d1 is described below
commit a133995d1a9412cb31822ef737782afaf674d0ea
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Fri Aug 26 12:51:20 2022 +0800
[INLONG-5694][Manager] Fix the problem that gets the inlong group error (#5695)
---
.../manager/plugin/listener/DeleteSortListener.java | 12 ++++++------
.../manager/plugin/listener/DeleteStreamListener.java | 12 ++++++------
.../manager/plugin/listener/RestartSortListener.java | 14 +++++++-------
.../manager/plugin/listener/RestartStreamListener.java | 14 +++++++-------
.../manager/plugin/listener/StartupStreamListener.java | 14 +++++++-------
.../manager/plugin/listener/SuspendSortListener.java | 12 ++++++------
.../manager/plugin/listener/SuspendStreamListener.java | 12 ++++++------
.../service/core/impl/ConsumptionServiceImpl.java | 8 ++++----
.../service/core/impl/SortSourceServiceImpl.java | 17 +++++++++--------
.../manager/service/group/InlongGroupServiceImpl.java | 5 +++--
.../workflow/core/impl/WorkflowQueryServiceImpl.java | 5 +++--
11 files changed, 64 insertions(+), 61 deletions(-)
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index f2dd23537..227674f6e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -23,21 +23,21 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -88,8 +88,8 @@ public class DeleteSortListener implements SortOperateListener {
List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
log.info("inlong group ext info: {}", extList);
- Map<String, String> kvConf = extList.stream().collect(
- Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ Map<String, String> kvConf = new HashMap<>();
+ extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format("delete sort failed for groupId [%s], as the sort properties is empty",
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
index d54c6aa56..96c856781 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
@@ -23,23 +23,23 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -89,8 +89,8 @@ public class DeleteStreamListener implements SortOperateListener {
log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
- Map<String, String> kvConf = groupExtList.stream().collect(
- Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ Map<String, String> kvConf = new HashMap<>();
+ groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
streamExtList.forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index fc4ef876c..2b2218b52 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -23,22 +23,22 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.plugin.flink.FlinkOperation;
import org.apache.inlong.manager.plugin.flink.FlinkService;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.Constants;
+import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -89,8 +89,8 @@ public class RestartSortListener implements SortOperateListener {
List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
log.info("inlong group ext info: {}", extList);
- Map<String, String> kvConf = extList.stream().collect(
- Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ Map<String, String> kvConf = new HashMap<>();
+ extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format("restart sort failed for groupId [%s], as the sort properties is empty",
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index cdf570e5e..ab94de669 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -23,24 +23,24 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -90,8 +90,8 @@ public class RestartStreamListener implements SortOperateListener {
log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
- Map<String, String> kvConf = groupExtList.stream().collect(
- Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ Map<String, String> kvConf = new HashMap<>();
+ groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
streamExtList.stream().forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
index 9c37449f2..1ee9deffb 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
@@ -24,24 +24,24 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -97,8 +97,8 @@ public class StartupStreamListener implements SortOperateListener {
return ListenerResult.success();
}
- Map<String, String> kvConf = groupExtList.stream().collect(
- Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ Map<String, String> kvConf = new HashMap<>();
+ groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
streamExtList.forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
index 88428c118..59eb35e32 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
@@ -23,21 +23,21 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -88,8 +88,8 @@ public class SuspendSortListener implements SortOperateListener {
List<InlongGroupExtInfo> extList = inlongGroupInfo.getExtList();
log.info("inlong group ext info: {}", extList);
- Map<String, String> kvConf = extList.stream().collect(
- Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ Map<String, String> kvConf = new HashMap<>();
+ extList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
String sortExt = kvConf.get(InlongConstants.SORT_PROPERTIES);
if (StringUtils.isEmpty(sortExt)) {
String message = String.format("suspend sort failed for groupId [%s], as the sort properties is empty",
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
index c3b2e21ad..f1e13ddf6 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
@@ -23,23 +23,23 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
+import org.apache.inlong.manager.plugin.flink.FlinkOperation;
+import org.apache.inlong.manager.plugin.flink.FlinkService;
+import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.plugin.flink.FlinkOperation;
-import org.apache.inlong.manager.plugin.flink.FlinkService;
-import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.inlong.manager.plugin.util.FlinkUtils.getExceptionStackMsg;
@@ -89,8 +89,8 @@ public class SuspendStreamListener implements SortOperateListener {
log.info("inlong stream :{} ext info: {}", streamInfo.getInlongStreamId(), streamExtList);
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
- Map<String, String> kvConf = groupExtList.stream().collect(
- Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ Map<String, String> kvConf = new HashMap<>();
+ groupExtList.forEach(groupExtInfo -> kvConf.put(groupExtInfo.getKeyName(), groupExtInfo.getKeyValue()));
streamExtList.stream().forEach(extInfo -> {
kvConf.put(extInfo.getKeyName(), extInfo.getKeyValue());
});
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index e7a1c9e12..8a899a390 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -36,7 +36,6 @@ import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
-import org.apache.inlong.manager.pojo.common.CountInfo;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
@@ -62,6 +61,7 @@ import org.springframework.util.CollectionUtils;
import java.util.Arrays;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
@@ -98,9 +98,9 @@ public class ConsumptionServiceImpl implements ConsumptionService {
@Override
public ConsumptionSummary getSummary(ConsumptionQuery query) {
- Map<String, Integer> countMap = consumptionMapper.countByQuery(query)
- .stream()
- .collect(Collectors.toMap(CountInfo::getKey, CountInfo::getValue));
+ Map<String, Integer> countMap = new HashMap<>();
+ consumptionMapper.countByQuery(query)
+ .forEach(countInfo -> countMap.put(countInfo.getKey(), countInfo.getValue()));
return ConsumptionSummary.builder()
.totalCount(countMap.values().stream().mapToInt(c -> c).sum())
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index dc51b6e9f..0510ce2f1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -42,6 +42,7 @@ import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -235,11 +236,11 @@ public class SortSourceServiceImpl implements SortSourceService {
task2Group.forEach((task, groupList) -> {
// get topic properties under this cluster and task, group them by group id.
- Map<String, Map<String, String>> group2topicProp = allStreamInfos.stream()
- .filter(stream -> stream.getSortTaskName().equals(task)
- && stream.getSortClusterName().equals(clusterName))
- .collect(Collectors.toMap(SortSourceStreamInfo::getGroupId,
- SortSourceStreamInfo::getExtParamsMap));
+ Map<String, Map<String, String>> group2topicProp = new HashMap<>();
+ allStreamInfos.stream().filter(stream -> stream.getSortTaskName().equals(task)
+ && stream.getSortClusterName().equals(clusterName)).forEach(
+ sortSourceStreamInfo -> group2topicProp.put(sortSourceStreamInfo.getGroupId(),
+ sortSourceStreamInfo.getExtParamsMap()));
Map<String, CacheZone> cacheZones;
try {
@@ -317,9 +318,9 @@ public class SortSourceServiceImpl implements SortSourceService {
List<String> tags = new ArrayList<>(tag2GroupInfos.keySet());
// Clusters that related to these tags
- Map<String, List<SortSourceClusterInfo>> tag2ClusterInfos = allTag2ClusterInfos.entrySet().stream()
- .filter(entry -> tag2GroupInfos.containsKey(entry.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ Map<String, List<SortSourceClusterInfo>> tag2ClusterInfos = new HashMap<>();
+ allTag2ClusterInfos.entrySet().stream().filter(entry -> tag2GroupInfos.containsKey(entry.getKey()))
+ .forEach(entry -> tag2ClusterInfos.put(entry.getKey(), entry.getValue()));
// get CacheZone list
return tags.stream()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 65aecd2e3..d9c5f1c63 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -70,6 +70,7 @@ import org.springframework.validation.annotation.Validated;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -422,8 +423,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
}
private BaseSortConf buildSortConfig(List<InlongGroupExtInfo> extInfos) {
- Map<String, String> extMap = extInfos.stream()
- .collect(Collectors.toMap(InlongGroupExtInfo::getKeyName, InlongGroupExtInfo::getKeyValue));
+ Map<String, String> extMap = new HashMap<>();
+ extInfos.forEach(extInfo -> extMap.put(extInfo.getKeyName(), extInfo.getKeyValue()));
String type = extMap.get(InlongConstants.SORT_TYPE);
if (StringUtils.isBlank(type)) {
return null;
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
index b34bc8e83..5eac67e94 100644
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
+++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/WorkflowQueryServiceImpl.java
@@ -62,6 +62,7 @@ import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -118,8 +119,8 @@ public class WorkflowQueryServiceImpl implements WorkflowQueryService {
public ProcessCountResponse countProcess(ProcessCountRequest request) {
List<CountInfo> result = processEntityMapper.countByQuery(request);
- Map<String, Integer> countByState = result.stream()
- .collect(Collectors.toMap(CountInfo::getKey, CountInfo::getValue));
+ Map<String, Integer> countByState = new HashMap<>();
+ result.forEach(countInfo -> countByState.put(countInfo.getKey(), countInfo.getValue()));
return ProcessCountResponse.builder()
.totalApplyCount(countByState.values().stream().mapToInt(c -> c).sum())