You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/16 08:36:19 UTC
[incubator-inlong] branch master updated: [INLONG-3163][Manager] Throw Exception when source name or sink name was duplicate (#3164)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2db382f [INLONG-3163][Manager] Throw Exception when source name or sink name was duplicate (#3164)
2db382f is described below
commit 2db382f104ee63da45eedb23c0bee3700163c3d8
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Mar 16 16:36:13 2022 +0800
[INLONG-3163][Manager] Throw Exception when source name or sink name was duplicate (#3164)
---
.../manager/client/api/impl/InlongStreamImpl.java | 35 +++++++++++++---------
1 file changed, 21 insertions(+), 14 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index a4ca118..fdcc715 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -59,28 +59,35 @@ public class InlongStreamImpl extends InlongStream {
this.name = streamInfo.getName();
List<InlongStreamFieldInfo> streamFieldInfos = streamInfo.getFieldList();
if (CollectionUtils.isNotEmpty(streamFieldInfos)) {
- this.streamFields = streamFieldInfos.stream().map(streamFieldInfo -> {
- return new StreamField(streamFieldInfo.getId(),
- FieldType.forName(streamFieldInfo.getFieldType()),
- streamFieldInfo.getFieldName(),
- streamFieldInfo.getFieldComment(),
- streamFieldInfo.getFieldValue()
- );
- }).collect(Collectors.toList());
+ this.streamFields = streamFieldInfos.stream()
+ .map(streamFieldInfo -> new StreamField(
+ streamFieldInfo.getId(),
+ FieldType.forName(streamFieldInfo.getFieldType()),
+ streamFieldInfo.getFieldName(),
+ streamFieldInfo.getFieldComment(),
+ streamFieldInfo.getFieldValue())
+ ).collect(Collectors.toList());
}
List<SinkResponse> sinkList = fullStreamResponse.getSinkInfo();
if (CollectionUtils.isNotEmpty(sinkList)) {
this.streamSinks = sinkList.stream()
- .map(sinkResponse -> {
- return InlongStreamSinkTransfer.parseStreamSink(sinkResponse, null);
- }).collect(Collectors.toMap(StreamSink::getSinkName, streamSink -> streamSink));
+ .map(sinkResponse -> InlongStreamSinkTransfer.parseStreamSink(sinkResponse, null))
+ .collect(Collectors.toMap(StreamSink::getSinkName, streamSink -> streamSink,
+ (sinkName1, sinkName2) -> {
+ throw new RuntimeException(
+ String.format("duplicate sinkName:%s in stream:%s", sinkName1, this.name));
+ }));
}
List<SourceResponse> sourceList = fullStreamResponse.getSourceInfo();
if (CollectionUtils.isNotEmpty(sourceList)) {
this.streamSources = sourceList.stream()
- .map(sourceResponse -> {
- return InlongStreamSourceTransfer.parseStreamSource(sourceResponse);
- }).collect(Collectors.toMap(StreamSource::getSourceName, streamSource -> streamSource));
+ .map(sourceResponse -> InlongStreamSourceTransfer.parseStreamSource(sourceResponse))
+ .collect(Collectors.toMap(StreamSource::getSourceName, streamSource -> streamSource,
+ (sourceName1, sourceName2) -> {
+ throw new RuntimeException(
+ String.format("duplicate sourceName:%s in stream:%s", sourceName1, this.name));
+ }
+ ));
}
}