You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/10 04:08:13 UTC

[incubator-inlong] branch master updated: [INLONG-3033][Manager] Add sourceFieldName in sinkFields (#3036)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ccdaa94  [INLONG-3033][Manager] Add sourceFieldName in sinkFields (#3036)
ccdaa94 is described below

commit ccdaa9452d710cc2edc3cd34ef8183c2f1eb9d1b
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Mar 10 12:08:09 2022 +0800

    [INLONG-3033][Manager] Add sourceFieldName in sinkFields (#3036)
---
 .../api/impl/DefaultInlongStreamBuilder.java       |  3 ++
 .../client/api/util/InlongStreamSinkTransfer.java  | 57 ++++++++++------------
 .../client/api/util/InlongStreamTransfer.java      |  4 ++
 3 files changed, 34 insertions(+), 30 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 9d7568f..9abf6c9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -73,6 +73,9 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
         groupContext.setStreamContext(streamContext);
         this.streamContext = streamContext;
         this.inlongStream = new InlongStreamImpl(streamInfo.getName());
+        if (CollectionUtils.isNotEmpty(streamConf.getStreamFields())) {
+            this.inlongStream.setStreamFields(streamConf.getStreamFields());
+        }
         groupContext.setStream(this.inlongStream);
     }
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 0ef37c9..496daab 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.client.api.util;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.compress.utils.Lists;
 import org.apache.inlong.manager.client.api.DataFormat;
 import org.apache.inlong.manager.client.api.DataSeparator;
 import org.apache.inlong.manager.client.api.StreamField;
@@ -38,6 +39,7 @@ import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 
@@ -107,16 +109,8 @@ public class InlongStreamSinkTransfer {
         clickHouseSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
         clickHouseSinkRequest.setEnableCreateResource(clickHouseSink.isNeedCreated() ? 1 : 0);
         if (CollectionUtils.isNotEmpty(clickHouseSink.getStreamFields())) {
-            List<SinkFieldRequest> fieldRequests = clickHouseSink.getStreamFields()
-                    .stream()
-                    .map(streamField -> {
-                        SinkFieldRequest storageFieldRequest = new SinkFieldRequest();
-                        storageFieldRequest.setFieldName(streamField.getFieldName());
-                        storageFieldRequest.setFieldType(streamField.getFieldType().toString());
-                        storageFieldRequest.setFieldComment(streamField.getFieldComment());
-                        return storageFieldRequest;
-                    })
-                    .collect(Collectors.toList());
+            List<SinkFieldRequest> fieldRequests = createSinkFields(streamSink.getStreamFields(),
+                    streamInfo.getFieldList());
             clickHouseSinkRequest.setFieldList(fieldRequests);
         }
         return clickHouseSinkRequest;
@@ -172,16 +166,8 @@ public class InlongStreamSinkTransfer {
         kafkaSinkRequest.setSerializationType(kafkaSink.getDataFormat().name());
         kafkaSinkRequest.setEnableCreateResource(kafkaSink.isNeedCreated() ? 1 : 0);
         if (CollectionUtils.isNotEmpty(kafkaSink.getStreamFields())) {
-            List<SinkFieldRequest> fieldRequests = kafkaSink.getStreamFields()
-                    .stream()
-                    .map(streamField -> {
-                        SinkFieldRequest storageFieldRequest = new SinkFieldRequest();
-                        storageFieldRequest.setFieldName(streamField.getFieldName());
-                        storageFieldRequest.setFieldType(streamField.getFieldType().toString());
-                        storageFieldRequest.setFieldComment(streamField.getFieldComment());
-                        return storageFieldRequest;
-                    })
-                    .collect(Collectors.toList());
+            List<SinkFieldRequest> fieldRequests = createSinkFields(kafkaSink.getStreamFields(),
+                    streamInfo.getFieldList());
             kafkaSinkRequest.setFieldList(fieldRequests);
         }
         return kafkaSinkRequest;
@@ -244,21 +230,32 @@ public class InlongStreamSinkTransfer {
         hiveSinkRequest.setPrimaryPartition(hiveSink.getPrimaryPartition());
         hiveSinkRequest.setSecondaryPartition(hiveSink.getSecondaryPartition());
         if (CollectionUtils.isNotEmpty(hiveSink.getStreamFields())) {
-            List<SinkFieldRequest> fieldRequests = hiveSink.getStreamFields()
-                    .stream()
-                    .map(streamField -> {
-                        SinkFieldRequest storageFieldRequest = new SinkFieldRequest();
-                        storageFieldRequest.setFieldName(streamField.getFieldName());
-                        storageFieldRequest.setFieldType(streamField.getFieldType().toString());
-                        storageFieldRequest.setFieldComment(streamField.getFieldComment());
-                        return storageFieldRequest;
-                    })
-                    .collect(Collectors.toList());
+            List<SinkFieldRequest> fieldRequests = createSinkFields(streamSink.getStreamFields(),
+                    streamInfo.getFieldList());
             hiveSinkRequest.setFieldList(fieldRequests);
         }
         return hiveSinkRequest;
     }
 
+    private static List<SinkFieldRequest> createSinkFields(List<StreamField> sinkFields,
+            List<InlongStreamFieldInfo> sourceFields) {
+        List<SinkFieldRequest> sinkFieldRequests = Lists.newArrayList();
+        for (int i = 0; i < sinkFields.size(); i++) {
+            SinkFieldRequest sinkFieldRequest = new SinkFieldRequest();
+            StreamField sinkField = sinkFields.get(i);
+            sinkFieldRequest.setFieldName(sinkField.getFieldName());
+            sinkFieldRequest.setFieldType(sinkField.getFieldType().toString());
+            sinkFieldRequest.setFieldComment(sinkField.getFieldComment());
+            if (sourceFields.size() > i && sourceFields.get(i) != null) {
+                InlongStreamFieldInfo sourceField = sourceFields.get(i);
+                sinkFieldRequest.setSourceFieldName(sourceField.getFieldName());
+                sinkFieldRequest.setSourceFieldType(sourceField.getFieldType());
+            }
+            sinkFieldRequests.add(sinkFieldRequest);
+        }
+        return sinkFieldRequests;
+    }
+
     private static HiveSink parseHiveSink(HiveSinkResponse sinkResponse, StreamSink sink) {
         HiveSink hiveSink = new HiveSink();
         if (sink != null) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index f364adc..12988b3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.client.api.util;
 
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.InlongStreamConf;
 import org.apache.inlong.manager.client.api.StreamField;
@@ -48,6 +49,9 @@ public class InlongStreamTransfer {
         dataStreamInfo.setDailyStorage(streamConf.getDailyStorage());
         dataStreamInfo.setPeakRecords(streamConf.getPeakRecords());
         dataStreamInfo.setHavePredefinedFields(0);
+        if (CollectionUtils.isNotEmpty(streamConf.getStreamFields())) {
+            dataStreamInfo.setFieldList(createStreamFields(streamConf.getStreamFields(), dataStreamInfo));
+        }
         return dataStreamInfo;
     }