You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/10 04:08:13 UTC
[incubator-inlong] branch master updated: [INLONG-3033][Manager] Add sourceFieldName in sinkFields (#3036)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ccdaa94 [INLONG-3033][Manager] Add sourceFieldName in sinkFields (#3036)
ccdaa94 is described below
commit ccdaa9452d710cc2edc3cd34ef8183c2f1eb9d1b
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Mar 10 12:08:09 2022 +0800
[INLONG-3033][Manager] Add sourceFieldName in sinkFields (#3036)
---
.../api/impl/DefaultInlongStreamBuilder.java | 3 ++
.../client/api/util/InlongStreamSinkTransfer.java | 57 ++++++++++------------
.../client/api/util/InlongStreamTransfer.java | 4 ++
3 files changed, 34 insertions(+), 30 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 9d7568f..9abf6c9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -73,6 +73,9 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
groupContext.setStreamContext(streamContext);
this.streamContext = streamContext;
this.inlongStream = new InlongStreamImpl(streamInfo.getName());
+ if (CollectionUtils.isNotEmpty(streamConf.getStreamFields())) {
+ this.inlongStream.setStreamFields(streamConf.getStreamFields());
+ }
groupContext.setStream(this.inlongStream);
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 0ef37c9..496daab 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.client.api.util;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.compress.utils.Lists;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.DataSeparator;
import org.apache.inlong.manager.client.api.StreamField;
@@ -38,6 +39,7 @@ import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -107,16 +109,8 @@ public class InlongStreamSinkTransfer {
clickHouseSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
clickHouseSinkRequest.setEnableCreateResource(clickHouseSink.isNeedCreated() ? 1 : 0);
if (CollectionUtils.isNotEmpty(clickHouseSink.getStreamFields())) {
- List<SinkFieldRequest> fieldRequests = clickHouseSink.getStreamFields()
- .stream()
- .map(streamField -> {
- SinkFieldRequest storageFieldRequest = new SinkFieldRequest();
- storageFieldRequest.setFieldName(streamField.getFieldName());
- storageFieldRequest.setFieldType(streamField.getFieldType().toString());
- storageFieldRequest.setFieldComment(streamField.getFieldComment());
- return storageFieldRequest;
- })
- .collect(Collectors.toList());
+ List<SinkFieldRequest> fieldRequests = createSinkFields(streamSink.getStreamFields(),
+ streamInfo.getFieldList());
clickHouseSinkRequest.setFieldList(fieldRequests);
}
return clickHouseSinkRequest;
@@ -172,16 +166,8 @@ public class InlongStreamSinkTransfer {
kafkaSinkRequest.setSerializationType(kafkaSink.getDataFormat().name());
kafkaSinkRequest.setEnableCreateResource(kafkaSink.isNeedCreated() ? 1 : 0);
if (CollectionUtils.isNotEmpty(kafkaSink.getStreamFields())) {
- List<SinkFieldRequest> fieldRequests = kafkaSink.getStreamFields()
- .stream()
- .map(streamField -> {
- SinkFieldRequest storageFieldRequest = new SinkFieldRequest();
- storageFieldRequest.setFieldName(streamField.getFieldName());
- storageFieldRequest.setFieldType(streamField.getFieldType().toString());
- storageFieldRequest.setFieldComment(streamField.getFieldComment());
- return storageFieldRequest;
- })
- .collect(Collectors.toList());
+ List<SinkFieldRequest> fieldRequests = createSinkFields(kafkaSink.getStreamFields(),
+ streamInfo.getFieldList());
kafkaSinkRequest.setFieldList(fieldRequests);
}
return kafkaSinkRequest;
@@ -244,21 +230,32 @@ public class InlongStreamSinkTransfer {
hiveSinkRequest.setPrimaryPartition(hiveSink.getPrimaryPartition());
hiveSinkRequest.setSecondaryPartition(hiveSink.getSecondaryPartition());
if (CollectionUtils.isNotEmpty(hiveSink.getStreamFields())) {
- List<SinkFieldRequest> fieldRequests = hiveSink.getStreamFields()
- .stream()
- .map(streamField -> {
- SinkFieldRequest storageFieldRequest = new SinkFieldRequest();
- storageFieldRequest.setFieldName(streamField.getFieldName());
- storageFieldRequest.setFieldType(streamField.getFieldType().toString());
- storageFieldRequest.setFieldComment(streamField.getFieldComment());
- return storageFieldRequest;
- })
- .collect(Collectors.toList());
+ List<SinkFieldRequest> fieldRequests = createSinkFields(streamSink.getStreamFields(),
+ streamInfo.getFieldList());
hiveSinkRequest.setFieldList(fieldRequests);
}
return hiveSinkRequest;
}
+ private static List<SinkFieldRequest> createSinkFields(List<StreamField> sinkFields,
+ List<InlongStreamFieldInfo> sourceFields) {
+ List<SinkFieldRequest> sinkFieldRequests = Lists.newArrayList();
+ for (int i = 0; i < sinkFields.size(); i++) {
+ SinkFieldRequest sinkFieldRequest = new SinkFieldRequest();
+ StreamField sinkField = sinkFields.get(i);
+ sinkFieldRequest.setFieldName(sinkField.getFieldName());
+ sinkFieldRequest.setFieldType(sinkField.getFieldType().toString());
+ sinkFieldRequest.setFieldComment(sinkField.getFieldComment());
+ if (sourceFields.size() > i && sourceFields.get(i) != null) {
+ InlongStreamFieldInfo sourceField = sourceFields.get(i);
+ sinkFieldRequest.setSourceFieldName(sourceField.getFieldName());
+ sinkFieldRequest.setSourceFieldType(sourceField.getFieldType());
+ }
+ sinkFieldRequests.add(sinkFieldRequest);
+ }
+ return sinkFieldRequests;
+ }
+
private static HiveSink parseHiveSink(HiveSinkResponse sinkResponse, StreamSink sink) {
HiveSink hiveSink = new HiveSink();
if (sink != null) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index f364adc..12988b3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.client.api.util;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.StreamField;
@@ -48,6 +49,9 @@ public class InlongStreamTransfer {
dataStreamInfo.setDailyStorage(streamConf.getDailyStorage());
dataStreamInfo.setPeakRecords(streamConf.getPeakRecords());
dataStreamInfo.setHavePredefinedFields(0);
+ if (CollectionUtils.isNotEmpty(streamConf.getStreamFields())) {
+ dataStreamInfo.setFieldList(createStreamFields(streamConf.getStreamFields(), dataStreamInfo));
+ }
return dataStreamInfo;
}