You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/05 03:54:46 UTC

[incubator-inlong] branch master updated: [INLONG-2924][Manager] Fix NPE in manager client (#2925)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ebe7992  [INLONG-2924][Manager] Fix NPE in manager client (#2925)
ebe7992 is described below

commit ebe7992efa22dca24f6ce26b25b4699719000417
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat Mar 5 11:54:39 2022 +0800

    [INLONG-2924][Manager] Fix NPE in manager client (#2925)
---
 .../manager/client/api/impl/InlongGroupImpl.java   | 23 +----------
 .../manager/client/api/impl/InlongStreamImpl.java  | 21 ++++++----
 .../client/api/inner/InnerGroupContext.java        |  4 +-
 .../api/util/InlongStreamSourceTransfer.java       | 48 ++++++++++++++++++++++
 4 files changed, 65 insertions(+), 31 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 63f2a26..9dbb4f3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -17,11 +17,9 @@
 
 package org.apache.inlong.manager.client.api.impl;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupConf;
@@ -36,14 +34,12 @@ import org.apache.inlong.manager.client.api.util.AssertUtil;
 import org.apache.inlong.manager.client.api.util.GsonUtil;
 import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
 import org.apache.inlong.manager.client.api.util.InlongParser;
-import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
-import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
@@ -206,23 +202,8 @@ public class InlongGroupImpl implements InlongGroup {
 
     private List<InlongStream> fetchDataStreams(String groupId) {
         List<FullStreamResponse> streamResponses = managerClient.listStreamInfo(groupId);
-        List<InlongStream> streamList = new ArrayList<>();
-        if (CollectionUtils.isNotEmpty(streamResponses)) {
-            streamList = streamResponses.stream().map(fullStreamResponse -> {
-                List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId,
-                        fullStreamResponse.getStreamInfo().getInlongStreamId());
-                String streamName = fullStreamResponse.getStreamInfo().getName();
-                InlongStream stream = groupContext.getStream(streamName);
-                InlongStreamImpl inlongStream = new InlongStreamImpl(fullStreamResponse, stream);
-                if (CollectionUtils.isNotEmpty(sourceListResponses)) {
-                    for (SourceListResponse response : sourceListResponses) {
-                        inlongStream.updateSource(
-                                InlongStreamSourceTransfer.parseStreamSource(response));
-                    }
-                }
-                return inlongStream;
-            }).collect(Collectors.toList());
-        }
+        List<InlongStream> streamList = streamResponses.stream()
+                .map(fullStreamResponse -> new InlongStreamImpl(fullStreamResponse)).collect(Collectors.toList());
         return streamList;
     }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 0a8bc5f..7727e69 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -32,7 +32,9 @@ import org.apache.inlong.manager.client.api.StreamSink;
 import org.apache.inlong.manager.client.api.StreamSource;
 import org.apache.inlong.manager.client.api.util.AssertUtil;
 import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
+import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -50,11 +52,9 @@ public class InlongStreamImpl extends InlongStream {
 
     private List<StreamField> streamFields;
 
-    public InlongStreamImpl(FullStreamResponse fullStreamResponse, InlongStream curStreamInfo) {
+    public InlongStreamImpl(FullStreamResponse fullStreamResponse) {
         InlongStreamInfo streamInfo = fullStreamResponse.getStreamInfo();
         this.name = streamInfo.getName();
-        this.streamSinks = curStreamInfo.getSinks();
-        this.streamSources = curStreamInfo.getSources();
         List<InlongStreamFieldInfo> streamFieldInfos = streamInfo.getFieldList();
         this.streamFields = streamFieldInfos.stream().map(streamFieldInfo -> {
             return new StreamField(streamFieldInfo.getId(),
@@ -66,14 +66,19 @@ public class InlongStreamImpl extends InlongStream {
         }).collect(Collectors.toList());
         List<SinkResponse> sinkList = fullStreamResponse.getSinkInfo();
         if (CollectionUtils.isNotEmpty(sinkList)) {
-            Map<String, StreamSink> streamSinks = sinkList.stream()
+            this.streamSinks = sinkList.stream()
                     .map(sinkResponse -> {
-                        String sinkName = sinkResponse.getSinkName();
-                        StreamSink streamSink = this.streamSinks.get(sinkName);
-                        return InlongStreamSinkTransfer.parseStreamSink(sinkResponse, streamSink);
+                        return InlongStreamSinkTransfer.parseStreamSink(sinkResponse, null);
                     }).collect(Collectors.toMap(StreamSink::getSinkName, streamSink -> streamSink));
-            this.streamSinks = streamSinks;
         }
+        List<SourceResponse> sourceList = fullStreamResponse.getSourceInfo();
+        if (CollectionUtils.isNotEmpty(sourceList)) {
+            this.streamSources = sourceList.stream()
+                    .map(sourceResponse -> {
+                        return InlongStreamSourceTransfer.parseStreamSource(sourceResponse);
+                    }).collect(Collectors.toMap(StreamSource::getSourceName, streamSource -> streamSource));
+        }
+
     }
 
     public InlongStreamImpl(String name) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
index 24cf764..fbb8312 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
@@ -37,9 +37,9 @@ public class InnerGroupContext {
 
     private InlongGroupInfo groupInfo;
 
-    private Map<String, InnerStreamContext> streamContextMap;
+    private Map<String, InnerStreamContext> streamContextMap = Maps.newHashMap();
 
-    private Map<String, InlongStream> streamMap;
+    private Map<String, InlongStream> streamMap = Maps.newHashMap();
 
     private Pair<InlongGroupApproveRequest, List<InlongStreamApproveRequest>> initMsg;
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 82ae09e..195be65 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -29,10 +29,13 @@ import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
 public class InlongStreamSourceTransfer {
@@ -49,6 +52,18 @@ public class InlongStreamSourceTransfer {
         }
     }
 
+    public static StreamSource parseStreamSource(SourceResponse sourceResponse) {
+        String type = sourceResponse.getSourceType();
+        SourceType sourceType = SourceType.forType(type);
+        if (sourceType == SourceType.KAFKA && sourceResponse instanceof KafkaSourceResponse) {
+            return parseKafkaSource((KafkaSourceResponse) sourceResponse);
+        }
+        if (sourceType == SourceType.BINLOG && sourceResponse instanceof BinlogSourceResponse) {
+            return parseMySQLBinlogSource((BinlogSourceResponse) sourceResponse);
+        }
+        throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
+    }
+
     public static StreamSource parseStreamSource(SourceListResponse sourceListResponse) {
         String type = sourceListResponse.getSourceType();
         SourceType sourceType = SourceType.forType(type);
@@ -61,6 +76,21 @@ public class InlongStreamSourceTransfer {
         throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
     }
 
+    private static KafkaSource parseKafkaSource(KafkaSourceResponse kafkaSourceResponse) {
+        KafkaSource kafkaSource = new KafkaSource();
+        kafkaSource.setSourceName(kafkaSourceResponse.getSourceName());
+        kafkaSource.setConsumerGroup(kafkaSourceResponse.getGroupId());
+        DataFormat dataFormat = DataFormat.forName(kafkaSourceResponse.getSerializationType());
+        kafkaSource.setDataFormat(dataFormat);
+        kafkaSource.setTopic(kafkaSourceResponse.getTopic());
+        kafkaSource.setBootstrapServers(kafkaSourceResponse.getBootstrapServers());
+        kafkaSource.setByteSpeedLimit(kafkaSourceResponse.getByteSpeedLimit());
+        kafkaSource.setTopicPartitionOffset(kafkaSourceResponse.getTopicPartitionOffset());
+        kafkaSource.setRecordSpeedLimit(kafkaSourceResponse.getRecordSpeedLimit());
+        kafkaSource.setSyncType(SyncType.FULL);
+        return kafkaSource;
+    }
+
     private static KafkaSource parseKafkaSource(KafkaSourceListResponse kafkaSourceResponse) {
         KafkaSource kafkaSource = new KafkaSource();
         kafkaSource.setSourceName(kafkaSourceResponse.getSourceName());
@@ -76,6 +106,24 @@ public class InlongStreamSourceTransfer {
         return kafkaSource;
     }
 
+    private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceResponse binlogSourceResponse) {
+        MySQLBinlogSource binlogSource = new MySQLBinlogSource();
+        binlogSource.setSourceName(binlogSourceResponse.getSourceName());
+        binlogSource.setHostname(binlogSourceResponse.getHostname());
+        binlogSource.setDataFormat(DataFormat.NONE);
+        binlogSource.setPort(binlogSourceResponse.getPort());
+        DefaultAuthentication defaultAuthentication = new DefaultAuthentication(
+                binlogSourceResponse.getUser(),
+                binlogSourceResponse.getPassword());
+        binlogSource.setAllMigration(binlogSourceResponse.isAllMigration());
+        binlogSource.setAuthentication(defaultAuthentication);
+        binlogSource.setTimeZone(binlogSourceResponse.getTimeZone());
+        binlogSource.setTimestampFormatStandard(binlogSourceResponse.getTimestampFormatStandard());
+        List<String> dbs = Splitter.on(",").splitToList(binlogSourceResponse.getWhitelist());
+        binlogSource.setDbNames(dbs);
+        return binlogSource;
+    }
+
     private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceListResponse binlogSourceResponse) {
         MySQLBinlogSource binlogSource = new MySQLBinlogSource();
         binlogSource.setSourceName(binlogSourceResponse.getSourceName());