You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/05 03:54:46 UTC
[incubator-inlong] branch master updated: [INLONG-2924][Manager] Fix NPE in manager client (#2925)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ebe7992 [INLONG-2924][Manager] Fix NPE in manager client (#2925)
ebe7992 is described below
commit ebe7992efa22dca24f6ce26b25b4699719000417
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat Mar 5 11:54:39 2022 +0800
[INLONG-2924][Manager] Fix NPE in manager client (#2925)
---
.../manager/client/api/impl/InlongGroupImpl.java | 23 +----------
.../manager/client/api/impl/InlongStreamImpl.java | 21 ++++++----
.../client/api/inner/InnerGroupContext.java | 4 +-
.../api/util/InlongStreamSourceTransfer.java | 48 ++++++++++++++++++++++
4 files changed, 65 insertions(+), 31 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 63f2a26..9dbb4f3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -17,11 +17,9 @@
package org.apache.inlong.manager.client.api.impl;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupConf;
@@ -36,14 +34,12 @@ import org.apache.inlong.manager.client.api.util.AssertUtil;
import org.apache.inlong.manager.client.api.util.GsonUtil;
import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
import org.apache.inlong.manager.client.api.util.InlongParser;
-import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
-import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
@@ -206,23 +202,8 @@ public class InlongGroupImpl implements InlongGroup {
private List<InlongStream> fetchDataStreams(String groupId) {
List<FullStreamResponse> streamResponses = managerClient.listStreamInfo(groupId);
- List<InlongStream> streamList = new ArrayList<>();
- if (CollectionUtils.isNotEmpty(streamResponses)) {
- streamList = streamResponses.stream().map(fullStreamResponse -> {
- List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId,
- fullStreamResponse.getStreamInfo().getInlongStreamId());
- String streamName = fullStreamResponse.getStreamInfo().getName();
- InlongStream stream = groupContext.getStream(streamName);
- InlongStreamImpl inlongStream = new InlongStreamImpl(fullStreamResponse, stream);
- if (CollectionUtils.isNotEmpty(sourceListResponses)) {
- for (SourceListResponse response : sourceListResponses) {
- inlongStream.updateSource(
- InlongStreamSourceTransfer.parseStreamSource(response));
- }
- }
- return inlongStream;
- }).collect(Collectors.toList());
- }
+ List<InlongStream> streamList = streamResponses.stream()
+ .map(fullStreamResponse -> new InlongStreamImpl(fullStreamResponse)).collect(Collectors.toList());
return streamList;
}
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 0a8bc5f..7727e69 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -32,7 +32,9 @@ import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.StreamSource;
import org.apache.inlong.manager.client.api.util.AssertUtil;
import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
+import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -50,11 +52,9 @@ public class InlongStreamImpl extends InlongStream {
private List<StreamField> streamFields;
- public InlongStreamImpl(FullStreamResponse fullStreamResponse, InlongStream curStreamInfo) {
+ public InlongStreamImpl(FullStreamResponse fullStreamResponse) {
InlongStreamInfo streamInfo = fullStreamResponse.getStreamInfo();
this.name = streamInfo.getName();
- this.streamSinks = curStreamInfo.getSinks();
- this.streamSources = curStreamInfo.getSources();
List<InlongStreamFieldInfo> streamFieldInfos = streamInfo.getFieldList();
this.streamFields = streamFieldInfos.stream().map(streamFieldInfo -> {
return new StreamField(streamFieldInfo.getId(),
@@ -66,14 +66,19 @@ public class InlongStreamImpl extends InlongStream {
}).collect(Collectors.toList());
List<SinkResponse> sinkList = fullStreamResponse.getSinkInfo();
if (CollectionUtils.isNotEmpty(sinkList)) {
- Map<String, StreamSink> streamSinks = sinkList.stream()
+ this.streamSinks = sinkList.stream()
.map(sinkResponse -> {
- String sinkName = sinkResponse.getSinkName();
- StreamSink streamSink = this.streamSinks.get(sinkName);
- return InlongStreamSinkTransfer.parseStreamSink(sinkResponse, streamSink);
+ return InlongStreamSinkTransfer.parseStreamSink(sinkResponse, null);
}).collect(Collectors.toMap(StreamSink::getSinkName, streamSink -> streamSink));
- this.streamSinks = streamSinks;
}
+ List<SourceResponse> sourceList = fullStreamResponse.getSourceInfo();
+ if (CollectionUtils.isNotEmpty(sourceList)) {
+ this.streamSources = sourceList.stream()
+ .map(sourceResponse -> {
+ return InlongStreamSourceTransfer.parseStreamSource(sourceResponse);
+ }).collect(Collectors.toMap(StreamSource::getSourceName, streamSource -> streamSource));
+ }
+
}
public InlongStreamImpl(String name) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
index 24cf764..fbb8312 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
@@ -37,9 +37,9 @@ public class InnerGroupContext {
private InlongGroupInfo groupInfo;
- private Map<String, InnerStreamContext> streamContextMap;
+ private Map<String, InnerStreamContext> streamContextMap = Maps.newHashMap();
- private Map<String, InlongStream> streamMap;
+ private Map<String, InlongStream> streamMap = Maps.newHashMap();
private Pair<InlongGroupApproveRequest, List<InlongStreamApproveRequest>> initMsg;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 82ae09e..195be65 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -29,10 +29,13 @@ import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
public class InlongStreamSourceTransfer {
@@ -49,6 +52,18 @@ public class InlongStreamSourceTransfer {
}
}
+ public static StreamSource parseStreamSource(SourceResponse sourceResponse) {
+ String type = sourceResponse.getSourceType();
+ SourceType sourceType = SourceType.forType(type);
+ if (sourceType == SourceType.KAFKA && sourceResponse instanceof KafkaSourceResponse) {
+ return parseKafkaSource((KafkaSourceResponse) sourceResponse);
+ }
+ if (sourceType == SourceType.BINLOG && sourceResponse instanceof BinlogSourceResponse) {
+ return parseMySQLBinlogSource((BinlogSourceResponse) sourceResponse);
+ }
+ throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
+ }
+
public static StreamSource parseStreamSource(SourceListResponse sourceListResponse) {
String type = sourceListResponse.getSourceType();
SourceType sourceType = SourceType.forType(type);
@@ -61,6 +76,21 @@ public class InlongStreamSourceTransfer {
throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
}
+ private static KafkaSource parseKafkaSource(KafkaSourceResponse kafkaSourceResponse) {
+ KafkaSource kafkaSource = new KafkaSource();
+ kafkaSource.setSourceName(kafkaSourceResponse.getSourceName());
+ kafkaSource.setConsumerGroup(kafkaSourceResponse.getGroupId());
+ DataFormat dataFormat = DataFormat.forName(kafkaSourceResponse.getSerializationType());
+ kafkaSource.setDataFormat(dataFormat);
+ kafkaSource.setTopic(kafkaSourceResponse.getTopic());
+ kafkaSource.setBootstrapServers(kafkaSourceResponse.getBootstrapServers());
+ kafkaSource.setByteSpeedLimit(kafkaSourceResponse.getByteSpeedLimit());
+ kafkaSource.setTopicPartitionOffset(kafkaSourceResponse.getTopicPartitionOffset());
+ kafkaSource.setRecordSpeedLimit(kafkaSourceResponse.getRecordSpeedLimit());
+ kafkaSource.setSyncType(SyncType.FULL);
+ return kafkaSource;
+ }
+
private static KafkaSource parseKafkaSource(KafkaSourceListResponse kafkaSourceResponse) {
KafkaSource kafkaSource = new KafkaSource();
kafkaSource.setSourceName(kafkaSourceResponse.getSourceName());
@@ -76,6 +106,24 @@ public class InlongStreamSourceTransfer {
return kafkaSource;
}
+ private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceResponse binlogSourceResponse) {
+ MySQLBinlogSource binlogSource = new MySQLBinlogSource();
+ binlogSource.setSourceName(binlogSourceResponse.getSourceName());
+ binlogSource.setHostname(binlogSourceResponse.getHostname());
+ binlogSource.setDataFormat(DataFormat.NONE);
+ binlogSource.setPort(binlogSourceResponse.getPort());
+ DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
+ binlogSourceResponse.getUser(),
+ binlogSourceResponse.getPassword());
+ binlogSource.setAllMigration(binlogSourceResponse.isAllMigration());
+ binlogSource.setAuthentication(defaultAuthentication);
+ binlogSource.setTimeZone(binlogSourceResponse.getTimeZone());
+ binlogSource.setTimestampFormatStandard(binlogSourceResponse.getTimestampFormatStandard());
+ List<String> dbs = Splitter.on(",").splitToList(binlogSourceResponse.getWhitelist());
+ binlogSource.setDbNames(dbs);
+ return binlogSource;
+ }
+
private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceListResponse binlogSourceResponse) {
MySQLBinlogSource binlogSource = new MySQLBinlogSource();
binlogSource.setSourceName(binlogSourceResponse.getSourceName());