You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/20 02:20:23 UTC
[inlong] branch master updated: [INLONG-5103][Manager] Add constant field support for stream source and transform node (#5106)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f6f9007a1 [INLONG-5103][Manager] Add constant field support for stream source and transform node (#5106)
f6f9007a1 is described below
commit f6f9007a1ac5e95c6a371b36d4de6f3d7e8bad06
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Wed Jul 20 10:20:19 2022 +0800
[INLONG-5103][Manager] Add constant field support for stream source and transform node (#5106)
---
.../service/sort/DefaultSortConfigOperator.java | 69 ++--
.../service/sort/util/ExtractNodeUtils.java | 143 ++++----
.../service/sort/util/FieldRelationUtils.java | 62 +++-
.../manager/service/sort/util/LoadNodeUtils.java | 358 ++++++++-------------
.../service/sort/util/TransformNodeUtils.java | 26 +-
5 files changed, 285 insertions(+), 373 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
index cf77c6c28..13ad3fc65 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
@@ -81,12 +82,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
GroupInfo configInfo;
// if the mode of inlong group is LIGHTWEIGHT, means not using any MQ as a cached source
- if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
- configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos);
- } else {
- configInfo = this.getNormalGroupInfo(groupInfo, streamInfos);
- }
-
+ configInfo = getGroupInfo(groupInfo, streamInfos);
String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
if (isStream) {
this.addToStreamExt(streamInfos, dataflow);
@@ -99,7 +95,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
}
}
- private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
// get source info
Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
// get sink info
@@ -114,12 +110,16 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
List<StreamInfo> sortStreamInfos = new ArrayList<>();
for (InlongStreamInfo inlongStream : streamInfoList) {
String streamId = inlongStream.getInlongStreamId();
-
+ Map<String, StreamField> constantFieldMap = new HashMap<>();
+ inlongStream.getSourceList()
+ .forEach(s -> parseConstantFieldMap(s.getSourceName(), s.getFieldList(), constantFieldMap));
List<TransformResponse> transformResponseList = transformMap.get(streamId);
+ transformResponseList
+ .forEach(s -> parseConstantFieldMap(s.getTransformName(), s.getFieldList(), constantFieldMap));
List<Node> nodes = this.createNodesForStream(sourceMap.get(streamId),
- transformResponseList, sinkMap.get(streamId));
- StreamInfo streamInfo = new StreamInfo(streamId, nodes,
- NodeRelationUtils.createNodeRelationsForStream(inlongStream));
+ transformResponseList, sinkMap.get(streamId), constantFieldMap);
+ List<NodeRelation> relations = NodeRelationUtils.createNodeRelationsForStream(inlongStream);
+ StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
sortStreamInfos.add(streamInfo);
// rebuild joinerNode relation
@@ -130,44 +130,31 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
}
/**
- * Get Sort GroupInfo of normal inlong group.
+ * Get constant field from stream fields
*
- * @see org.apache.inlong.sort.protocol.GroupInfo
+ * @param nodeId The node id
+ * @param fields The stream fields
+ * @param constantFieldMap The constant field map
*/
- private GroupInfo getNormalGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
- // get source info
- Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
- // get sink info
- Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
-
- // create StreamInfo for Sort protocol
- List<StreamInfo> sortStreamInfos = new ArrayList<>();
- for (InlongStreamInfo inlongStream : streamInfoList) {
- String streamId = inlongStream.getInlongStreamId();
- List<StreamSource> sources = sourceMap.get(streamId);
- List<StreamSink> sinks = sinkMap.get(streamId);
- StreamInfo sortStream = new StreamInfo(streamId,
- this.createNodesForStream(sources, sinks),
- this.createNodeRelationsForStream(sources, sinks));
- sortStreamInfos.add(sortStream);
+ private void parseConstantFieldMap(String nodeId, List<StreamField> fields,
+ Map<String, StreamField> constantFieldMap) {
+ if (CollectionUtils.isEmpty(fields)) {
+ return;
+ }
+ for (StreamField field : fields) {
+ if (field.getFieldValue() != null) {
+ constantFieldMap.put(String.format("%s-%s", nodeId, field.getFieldName()), field);
+ }
}
-
- return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
- }
-
- private List<Node> createNodesForStream(List<StreamSource> sources, List<StreamSink> streamSinks) {
- List<Node> nodes = Lists.newArrayList();
- nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
- nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks));
- return nodes;
}
private List<Node> createNodesForStream(List<StreamSource> sourceInfos,
- List<TransformResponse> transformResponses, List<StreamSink> streamSinks) {
+ List<TransformResponse> transformResponses, List<StreamSink> streamSinks,
+ Map<String, StreamField> constantFieldMap) {
List<Node> nodes = Lists.newArrayList();
nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceInfos));
- nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses));
- nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks));
+ nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
+ nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks, constantFieldMap));
return nodes;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 22d82b3ac..42f1b1128 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -109,8 +109,6 @@ public class ExtractNodeUtils {
* @return MySql extract node info
*/
public static MySqlExtractNode createExtractNode(MySQLBinlogSource binlogSource) {
- final String id = binlogSource.getSourceName();
- final String name = binlogSource.getSourceName();
final String database = binlogSource.getDatabaseWhiteList();
final String primaryKey = binlogSource.getPrimaryKey();
final String hostName = binlogSource.getHostname();
@@ -123,16 +121,12 @@ public class ExtractNodeUtils {
}
String tables = binlogSource.getTableWhiteList();
final List<String> tableNames = Splitter.on(",").splitToList(tables);
- final List<StreamField> streamFields = binlogSource.getFieldList();
- final List<FieldInfo> fieldInfos = streamFields.stream()
- .map(streamField -> FieldInfoUtils.parseStreamFieldInfo(streamField, name))
- .collect(Collectors.toList());
+ List<FieldInfo> fieldInfos = parseFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
final String serverTimeZone = binlogSource.getServerTimezone();
boolean incrementalSnapshotEnabled = true;
// TODO Needs to be configurable for those parameters
- Map<String, String> properties = binlogSource.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ Map<String, String> properties = parseProperties(binlogSource.getProperties());
if (binlogSource.isAllMigration()) {
// Unique properties when migrate all tables in database
incrementalSnapshotEnabled = false;
@@ -142,8 +136,8 @@ public class ExtractNodeUtils {
incrementalSnapshotEnabled = false;
properties.put("scan.incremental.snapshot.enabled", "false");
}
- return new MySqlExtractNode(id,
- name,
+ return new MySqlExtractNode(binlogSource.getSourceName(),
+ binlogSource.getSourceName(),
fieldInfos,
null,
properties,
@@ -166,12 +160,7 @@ public class ExtractNodeUtils {
* @return Kafka extract node info
*/
public static KafkaExtractNode createExtractNode(KafkaSource kafkaSource) {
- String id = kafkaSource.getSourceName();
- String name = kafkaSource.getSourceName();
- List<StreamField> streamFields = kafkaSource.getFieldList();
- List<FieldInfo> fieldInfos = streamFields.stream()
- .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
- .collect(Collectors.toList());
+ List<FieldInfo> fieldInfos = parseFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
String topic = kafkaSource.getTopic();
String bootstrapServers = kafkaSource.getBootstrapServers();
Format format;
@@ -210,11 +199,10 @@ public class ExtractNodeUtils {
}
final String primaryKey = kafkaSource.getPrimaryKey();
String groupId = kafkaSource.getGroupId();
- Map<String, String> properties = kafkaSource.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ Map<String, String> properties = parseProperties(kafkaSource.getProperties());
String partitionOffset = kafkaSource.getPartitionOffsets();
- return new KafkaExtractNode(id,
- name,
+ return new KafkaExtractNode(kafkaSource.getSourceName(),
+ kafkaSource.getSourceName(),
fieldInfos,
null,
properties,
@@ -235,12 +223,7 @@ public class ExtractNodeUtils {
* @return Pulsar extract node info
*/
public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
- String id = pulsarSource.getSourceName();
- String name = pulsarSource.getSourceName();
- List<StreamField> streamFields = pulsarSource.getFieldList();
- List<FieldInfo> fieldInfos = streamFields.stream()
- .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
- .collect(Collectors.toList());
+ List<FieldInfo> fieldInfos = parseFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
String fullTopicName =
pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
@@ -274,10 +257,9 @@ public class ExtractNodeUtils {
final String primaryKey = pulsarSource.getPrimaryKey();
final String serviceUrl = pulsarSource.getServiceUrl();
final String adminUrl = pulsarSource.getAdminUrl();
- Map<String, String> properties = pulsarSource.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
- return new PulsarExtractNode(id,
- name,
+ Map<String, String> properties = parseProperties(pulsarSource.getProperties());
+ return new PulsarExtractNode(pulsarSource.getSourceName(),
+ pulsarSource.getSourceName(),
fieldInfos,
null,
properties,
@@ -296,20 +278,14 @@ public class ExtractNodeUtils {
* @return PostgreSQL extract node info
*/
public static PostgresExtractNode createExtractNode(PostgreSQLSource postgreSQLSource) {
- List<StreamField> streamFields = postgreSQLSource.getFieldList();
- String id = postgreSQLSource.getSourceName();
- String name = postgreSQLSource.getSourceName();
- List<FieldInfo> fields = streamFields.stream()
- .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
- .collect(Collectors.toList());
- Map<String, String> properties = postgreSQLSource.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
- return new PostgresExtractNode(id, name, fields, null, properties,
- postgreSQLSource.getPrimaryKey(), postgreSQLSource.getTableNameList(),
- postgreSQLSource.getHostname(), postgreSQLSource.getUsername(),
- postgreSQLSource.getPassword(), postgreSQLSource.getDatabase(),
- postgreSQLSource.getSchema(), postgreSQLSource.getPort(),
- postgreSQLSource.getDecodingPluginName());
+ List<FieldInfo> fieldInfos = parseFieldInfos(postgreSQLSource.getFieldList(), postgreSQLSource.getSourceName());
+ Map<String, String> properties = parseProperties(postgreSQLSource.getProperties());
+ return new PostgresExtractNode(postgreSQLSource.getSourceName(), postgreSQLSource.getSourceName(),
+ fieldInfos, null, properties, postgreSQLSource.getPrimaryKey(),
+ postgreSQLSource.getTableNameList(), postgreSQLSource.getHostname(),
+ postgreSQLSource.getUsername(), postgreSQLSource.getPassword(),
+ postgreSQLSource.getDatabase(), postgreSQLSource.getSchema(),
+ postgreSQLSource.getPort(), postgreSQLSource.getDecodingPluginName());
}
/**
@@ -319,19 +295,13 @@ public class ExtractNodeUtils {
* @return oracle extract node info
*/
public static OracleExtractNode createExtractNode(OracleSource source) {
- String name = source.getSourceName();
- List<StreamField> streamFieldInfos = source.getFieldList();
- final List<FieldInfo> fieldInfos = streamFieldInfos.stream()
- .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
- .collect(Collectors.toList());
-
+ List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
ScanStartUpMode scanStartupMode = StringUtils.isBlank(source.getScanStartupMode())
? null : ScanStartUpMode.forName(source.getScanStartupMode());
- Map<String, String> properties = source.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ Map<String, String> properties = parseProperties(source.getProperties());
return new OracleExtractNode(
- name,
- name,
+ source.getSourceName(),
+ source.getSourceName(),
fieldInfos,
null,
properties,
@@ -354,17 +324,11 @@ public class ExtractNodeUtils {
* @return SQLServer extract node info
*/
public static SqlServerExtractNode createExtractNode(SQLServerSource source) {
- String name = source.getSourceName();
- List<StreamField> streamFields = source.getFieldList();
- List<FieldInfo> fieldInfos = streamFields.stream()
- .map(fieldInfo -> FieldInfoUtils.parseStreamFieldInfo(fieldInfo, name))
- .collect(Collectors.toList());
-
- Map<String, String> properties = source.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+ Map<String, String> properties = parseProperties(source.getProperties());
return new SqlServerExtractNode(
- name,
- name,
+ source.getSourceName(),
+ source.getSourceName(),
fieldInfos,
null,
properties,
@@ -387,16 +351,11 @@ public class ExtractNodeUtils {
* @return MongoDB extract node info
*/
public static MongoExtractNode createExtractNode(MongoDBSource source) {
- String name = source.getSourceName();
- List<StreamField> streamFields = source.getFieldList();
- List<FieldInfo> fieldInfos = streamFields.stream()
- .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
- .collect(Collectors.toList());
- Map<String, String> properties = source.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+ Map<String, String> properties = parseProperties(source.getProperties());
return new MongoExtractNode(
- name,
- name,
+ source.getSourceName(),
+ source.getSourceName(),
fieldInfos,
null,
properties,
@@ -415,16 +374,11 @@ public class ExtractNodeUtils {
* @return TubeMQ extract node info
*/
public static TubeMQExtractNode createExtractNode(TubeMQSource source) {
- String name = source.getSourceName();
- List<StreamField> streamFields = source.getFieldList();
- List<FieldInfo> fieldInfos = streamFields.stream()
- .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
- .collect(Collectors.toList());
- Map<String, String> properties = source.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+ Map<String, String> properties = parseProperties(source.getProperties());
return new TubeMQExtractNode(
- name,
- name,
+ source.getSourceName(),
+ source.getSourceName(),
fieldInfos,
null,
properties,
@@ -437,4 +391,29 @@ public class ExtractNodeUtils {
);
}
+ /**
+ * Parse FieldInfos
+ *
+ * @param streamFields The stream fields
+ * @param nodeId The node id
+ * @return FieldInfo list
+ */
+ private static List<FieldInfo> parseFieldInfos(List<StreamField> streamFields, String nodeId) {
+ // Filter constant fields
+ return streamFields.stream().filter(s -> s.getFieldValue() == null)
+ .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Parse properties
+ *
+ * @param properties The properties with string key and object value
+ * @return The properties with string key and string value
+ */
+ private static Map<String, String> parseProperties(Map<String, Object> properties) {
+ return properties.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
index 5793db886..a672a98fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
@@ -33,10 +33,12 @@ import org.apache.inlong.manager.common.pojo.transform.splitter.SplitterDefiniti
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.util.StreamParseUtils;
import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.StringTypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.transformation.CascadeFunction;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
import org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction;
@@ -56,7 +58,8 @@ public class FieldRelationUtils {
/**
* Create relation of fields.
*/
- public static List<FieldRelation> createFieldRelations(TransformResponse transformResponse) {
+ public static List<FieldRelation> createFieldRelations(TransformResponse transformResponse,
+ Map<String, StreamField> constantFieldMap) {
TransformType transformType = TransformType.forType(transformResponse.getTransformType());
TransformDefinition transformDefinition = StreamParseUtils.parseTransformDefinition(
transformResponse.getTransformDefinition(), transformType);
@@ -66,15 +69,17 @@ public class FieldRelationUtils {
switch (transformType) {
case SPLITTER:
SplitterDefinition splitterDefinition = (SplitterDefinition) transformDefinition;
- return createSplitterFieldRelations(fieldList, transformName, splitterDefinition, preNodes);
+ return createSplitterFieldRelations(fieldList, transformName, splitterDefinition,
+ preNodes, constantFieldMap);
case STRING_REPLACER:
StringReplacerDefinition replacerDefinition = (StringReplacerDefinition) transformDefinition;
- return createReplacerFieldRelations(fieldList, transformName, replacerDefinition, preNodes);
+ return createReplacerFieldRelations(fieldList, transformName,
+ replacerDefinition, preNodes, constantFieldMap);
case DE_DUPLICATION:
case FILTER:
- return createFieldRelations(fieldList, transformName);
+ return createFieldRelations(fieldList, transformName, constantFieldMap);
case JOINER:
- return createJoinerFieldRelations(fieldList, transformName);
+ return createJoinerFieldRelations(fieldList, transformName, constantFieldMap);
default:
throw new UnsupportedOperationException(
String.format("Unsupported transformType=%s", transformType));
@@ -84,12 +89,25 @@ public class FieldRelationUtils {
/**
* Create relation of fields.
*/
- private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList, String transformName) {
+ private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList, String transformName,
+ Map<String, StreamField> constantFieldMap) {
return fieldList.stream()
.map(FieldInfoUtils::parseStreamField)
.map(fieldInfo -> {
- FieldInfo inputField = new FieldInfo(fieldInfo.getName(), fieldInfo.getNodeId(),
- fieldInfo.getFormatInfo());
+ FunctionParam inputField;
+ String fieldKey = String.format("%s-%s", fieldInfo.getNodeId(), fieldInfo.getName());
+ StreamField constantField = constantFieldMap.get(fieldKey);
+ if (constantField != null) {
+ if (fieldInfo.getFormatInfo() != null
+ && fieldInfo.getFormatInfo().getTypeInfo() == StringTypeInfo.INSTANCE) {
+ inputField = new StringConstantParam(constantField.getFieldValue());
+ } else {
+ inputField = new ConstantParam(constantField.getFieldValue());
+ }
+ } else {
+ inputField = new FieldInfo(fieldInfo.getName(), fieldInfo.getNodeId(),
+ fieldInfo.getFormatInfo());
+ }
FieldInfo outputField = new FieldInfo(fieldInfo.getName(), transformName,
fieldInfo.getFormatInfo());
return new FieldRelation(inputField, outputField);
@@ -99,13 +117,26 @@ public class FieldRelationUtils {
/**
* Create relation of fields in join function.
*/
- private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList, String transformName) {
+ private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList, String transformName,
+ Map<String, StreamField> constantFieldMap) {
return fieldList.stream()
.map(streamField -> {
FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
streamField.getFieldType(), streamField.getFieldFormat());
- FieldInfo inputField = new FieldInfo(streamField.getOriginFieldName(),
- streamField.getOriginNodeName(), formatInfo);
+ FunctionParam inputField;
+ String fieldKey = String.format("%s-%s", streamField.getOriginNodeName(),
+ streamField.getOriginFieldName());
+ StreamField constantField = constantFieldMap.get(fieldKey);
+ if (constantField != null) {
+ if (formatInfo != null && formatInfo.getTypeInfo() == StringTypeInfo.INSTANCE) {
+ inputField = new StringConstantParam(constantField.getFieldValue());
+ } else {
+ inputField = new ConstantParam(constantField.getFieldValue());
+ }
+ } else {
+ inputField = new FieldInfo(streamField.getOriginFieldName(),
+ streamField.getOriginNodeName(), formatInfo);
+ }
FieldInfo outputField = new FieldInfo(streamField.getFieldName(),
transformName, formatInfo);
return new FieldRelation(inputField, outputField);
@@ -117,7 +148,8 @@ public class FieldRelationUtils {
*/
private static List<FieldRelation> createSplitterFieldRelations(
List<StreamField> fieldList, String transformName,
- SplitterDefinition splitterDefinition, String preNodes) {
+ SplitterDefinition splitterDefinition, String preNodes,
+ Map<String, StreamField> constantFieldMap) {
Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
String preNode = preNodes.split(",")[0];
List<SplitRule> splitRules = splitterDefinition.getSplitRules();
@@ -131,7 +163,7 @@ public class FieldRelationUtils {
List<StreamField> filteredFieldList = fieldList.stream()
.filter(streamFieldInfo -> !splitFields.contains(streamFieldInfo.getFieldName()))
.collect(Collectors.toList());
- fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName));
+ fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap));
return fieldRelations;
}
@@ -139,7 +171,7 @@ public class FieldRelationUtils {
* Create relation of fields in replace function.
*/
private static List<FieldRelation> createReplacerFieldRelations(List<StreamField> fieldList, String transformName,
- StringReplacerDefinition replacerDefinition, String preNodes) {
+ StringReplacerDefinition replacerDefinition, String preNodes, Map<String, StreamField> constantFieldMap) {
Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
String preNode = preNodes.split(",")[0];
List<ReplaceRule> replaceRules = replacerDefinition.getReplaceRules();
@@ -151,7 +183,7 @@ public class FieldRelationUtils {
List<StreamField> filteredFieldList = fieldList.stream()
.filter(streamFieldInfo -> !replaceFields.contains(streamFieldInfo.getFieldName()))
.collect(Collectors.toList());
- fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName));
+ fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap));
return fieldRelations;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 0e0c135cd..49ad68231 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -41,6 +41,8 @@ import org.apache.inlong.manager.common.pojo.sink.oracle.OracleSink;
import org.apache.inlong.manager.common.pojo.sink.postgresql.PostgreSQLSink;
import org.apache.inlong.manager.common.pojo.sink.sqlserver.SQLServerSink;
import org.apache.inlong.manager.common.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.sort.formats.common.StringTypeInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
import org.apache.inlong.sort.protocol.node.LoadNode;
@@ -64,7 +66,10 @@ import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
import java.util.HashMap;
import java.util.List;
@@ -79,47 +84,56 @@ public class LoadNodeUtils {
/**
* Create nodes of data load.
*/
- public static List<LoadNode> createLoadNodes(List<StreamSink> streamSinks) {
+ public static List<LoadNode> createLoadNodes(List<StreamSink> streamSinks,
+ Map<String, StreamField> constantFieldMap) {
if (CollectionUtils.isEmpty(streamSinks)) {
return Lists.newArrayList();
}
- return streamSinks.stream().map(LoadNodeUtils::createLoadNode).collect(Collectors.toList());
+ return streamSinks.stream()
+ .map(s -> LoadNodeUtils.createLoadNode(s, constantFieldMap)).collect(Collectors.toList());
}
/**
* Create load node from the stream sink info.
*/
- public static LoadNode createLoadNode(StreamSink streamSink) {
+ public static LoadNode createLoadNode(StreamSink streamSink, Map<String, StreamField> constantFieldMap) {
+ List<FieldInfo> fieldInfos = streamSink.getSinkFieldList().stream()
+ .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, streamSink.getSinkName()))
+ .collect(Collectors.toList());
+ List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(),
+ streamSink.getSinkName(), constantFieldMap);
+ Map<String, String> properties = streamSink.getProperties().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
SinkType sinkType = SinkType.forType(streamSink.getSinkType());
switch (sinkType) {
case KAFKA:
- return createLoadNode((KafkaSink) streamSink);
+ return createLoadNode((KafkaSink) streamSink, fieldInfos, fieldRelations, properties);
case HIVE:
- return createLoadNode((HiveSink) streamSink);
+ return createLoadNode((HiveSink) streamSink, fieldInfos, fieldRelations, properties);
case HBASE:
- return createLoadNode((HBaseSink) streamSink);
+ return createLoadNode((HBaseSink) streamSink, fieldInfos, fieldRelations, properties);
case POSTGRES:
- return createLoadNode((PostgreSQLSink) streamSink);
+ return createLoadNode((PostgreSQLSink) streamSink, fieldInfos, fieldRelations, properties);
case CLICKHOUSE:
- return createLoadNode((ClickHouseSink) streamSink);
+ return createLoadNode((ClickHouseSink) streamSink, fieldInfos, fieldRelations, properties);
case ICEBERG:
- return createLoadNode((IcebergSink) streamSink);
+ return createLoadNode((IcebergSink) streamSink, fieldInfos, fieldRelations, properties);
case SQLSERVER:
- return createLoadNode((SQLServerSink) streamSink);
+ return createLoadNode((SQLServerSink) streamSink, fieldInfos, fieldRelations, properties);
case ELASTICSEARCH:
- return createLoadNode((ElasticsearchSink) streamSink);
+ return createLoadNode((ElasticsearchSink) streamSink, fieldInfos, fieldRelations, properties);
case HDFS:
- return createLoadNode((HDFSSink) streamSink);
+ return createLoadNode((HDFSSink) streamSink, fieldInfos, fieldRelations, properties);
case GREENPLUM:
- return createLoadNode((GreenplumSink) streamSink);
+ return createLoadNode((GreenplumSink) streamSink, fieldInfos, fieldRelations, properties);
case MYSQL:
- return createLoadNode((MySQLSink) streamSink);
+ return createLoadNode((MySQLSink) streamSink, fieldInfos, fieldRelations, properties);
case ORACLE:
- return createLoadNode((OracleSink) streamSink);
+ return createLoadNode((OracleSink) streamSink, fieldInfos, fieldRelations, properties);
case TDSQLPOSTGRESQL:
- return createLoadNode((TDSQLPostgreSQLSink) streamSink);
+ return createLoadNode((TDSQLPostgreSQLSink) streamSink, fieldInfos, fieldRelations, properties);
case DLCICEBERG:
- return createLoadNode((DLCIcebergSink) streamSink);
+ return createLoadNode((DLCIcebergSink) streamSink, fieldInfos, fieldRelations, properties);
default:
throw new BusinessException(String.format("Unsupported sinkType=%s to create load node", sinkType));
}
@@ -128,16 +142,8 @@ public class LoadNodeUtils {
/**
* Create load node of Kafka.
*/
- public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink) {
- String id = kafkaSink.getSinkName();
- String name = kafkaSink.getSinkName();
- List<SinkField> fieldList = kafkaSink.getSinkFieldList();
- List<FieldInfo> fieldInfos = fieldList.stream()
- .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
- Map<String, String> properties = kafkaSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
Integer sinkParallelism = null;
if (StringUtils.isNotEmpty(kafkaSink.getPartitionNum())) {
sinkParallelism = Integer.parseInt(kafkaSink.getPartitionNum());
@@ -165,8 +171,8 @@ public class LoadNodeUtils {
}
return new KafkaLoadNode(
- id,
- name,
+ kafkaSink.getSinkName(),
+ kafkaSink.getSinkName(),
fieldInfos,
fieldRelations,
Lists.newArrayList(),
@@ -183,27 +189,19 @@ public class LoadNodeUtils {
/**
* Create load node of Hive.
*/
- public static HiveLoadNode createLoadNode(HiveSink hiveSink) {
- String id = hiveSink.getSinkName();
- String name = hiveSink.getSinkName();
- List<SinkField> fieldList = hiveSink.getSinkFieldList();
- List<FieldInfo> fields = fieldList.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
- Map<String, String> properties = hiveSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ public static HiveLoadNode createLoadNode(HiveSink hiveSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
List<FieldInfo> partitionFields = Lists.newArrayList();
if (CollectionUtils.isNotEmpty(hiveSink.getPartitionFieldList())) {
partitionFields = hiveSink.getPartitionFieldList().stream()
- .map(partitionField -> new FieldInfo(partitionField.getFieldName(), name,
+ .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hiveSink.getSinkName(),
FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
partitionField.getFieldFormat()))).collect(Collectors.toList());
}
return new HiveLoadNode(
- id,
- name,
- fields,
+ hiveSink.getSinkName(),
+ hiveSink.getSinkName(),
+ fieldInfos,
fieldRelations,
Lists.newArrayList(),
null,
@@ -222,20 +220,12 @@ public class LoadNodeUtils {
/**
* Create load node of HBase.
*/
- public static HbaseLoadNode createLoadNode(HBaseSink hbaseSink) {
- String id = hbaseSink.getSinkName();
- String name = hbaseSink.getSinkName();
- List<SinkField> fieldList = hbaseSink.getSinkFieldList();
- List<FieldInfo> fields = fieldList.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
- Map<String, String> properties = hbaseSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ public static HbaseLoadNode createLoadNode(HBaseSink hbaseSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
return new HbaseLoadNode(
- id,
- name,
- fields,
+ hbaseSink.getSinkName(),
+ hbaseSink.getSinkName(),
+ fieldInfos,
fieldRelations,
Lists.newArrayList(),
null,
@@ -255,23 +245,17 @@ public class LoadNodeUtils {
/**
* Create load node of PostgreSQL.
*/
- public static PostgresLoadNode createLoadNode(PostgreSQLSink postgreSQLSink) {
- List<SinkField> fieldList = postgreSQLSink.getSinkFieldList();
- String name = postgreSQLSink.getSinkName();
- List<FieldInfo> fields = fieldList.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-
+ public static PostgresLoadNode createLoadNode(PostgreSQLSink postgreSQLSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
return new PostgresLoadNode(
- name,
- name,
- fields,
+ postgreSQLSink.getSinkName(),
+ postgreSQLSink.getSinkName(),
+ fieldInfos,
fieldRelations,
null,
null,
- 1,
null,
+ properties,
postgreSQLSink.getJdbcUrl(),
postgreSQLSink.getUsername(),
postgreSQLSink.getPassword(),
@@ -283,23 +267,17 @@ public class LoadNodeUtils {
/**
* Create load node of ClickHouse.
*/
- public static ClickHouseLoadNode createLoadNode(ClickHouseSink ckSink) {
- List<SinkField> sinkFields = ckSink.getSinkFieldList();
- String name = ckSink.getSinkName();
- List<FieldInfo> fields = sinkFields.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(sinkFields, name);
-
+ public static ClickHouseLoadNode createLoadNode(ClickHouseSink ckSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
return new ClickHouseLoadNode(
- name,
- name,
- fields,
+ ckSink.getSinkName(),
+ ckSink.getSinkName(),
+ fieldInfos,
fieldRelations,
null,
null,
- 1,
null,
+ properties,
ckSink.getTableName(),
ckSink.getJdbcUrl() + "/" + ckSink.getDbName(),
ckSink.getUsername(),
@@ -310,26 +288,17 @@ public class LoadNodeUtils {
/**
* Create load node of Iceberg.
*/
- public static IcebergLoadNode createLoadNode(IcebergSink icebergSink) {
- String id = icebergSink.getSinkName();
- String name = icebergSink.getSinkName();
+ public static IcebergLoadNode createLoadNode(IcebergSink icebergSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType());
- List<SinkField> sinkFields = icebergSink.getSinkFieldList();
- List<FieldInfo> fields = sinkFields.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
- Map<String, String> properties = icebergSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
return new IcebergLoadNode(
- id,
- name,
- fields,
- fieldRelationShips,
+ icebergSink.getSinkName(),
+ icebergSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
null,
null,
- 1,
properties,
icebergSink.getDbName(),
icebergSink.getTableName(),
@@ -343,21 +312,12 @@ public class LoadNodeUtils {
/**
* Create load node of SQLServer.
*/
- public static SqlServerLoadNode createLoadNode(SQLServerSink sqlServerSink) {
- final String id = sqlServerSink.getSinkName();
- final String name = sqlServerSink.getSinkName();
- final List<SinkField> fieldList = sqlServerSink.getSinkFieldList();
- List<FieldInfo> fields = fieldList.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
- Map<String, String> properties = sqlServerSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+ public static SqlServerLoadNode createLoadNode(SQLServerSink sqlServerSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
return new SqlServerLoadNode(
- id,
- name,
- fields,
+ sqlServerSink.getSinkName(),
+ sqlServerSink.getSinkName(),
+ fieldInfos,
fieldRelations,
null,
null,
@@ -375,21 +335,12 @@ public class LoadNodeUtils {
/**
* Create Elasticsearch load node
*/
- public static ElasticsearchLoadNode createLoadNode(ElasticsearchSink elasticsearchSink) {
- final String id = elasticsearchSink.getSinkName();
- final String name = elasticsearchSink.getSinkName();
- final List<SinkField> fieldList = elasticsearchSink.getSinkFieldList();
- List<FieldInfo> fields = fieldList.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
- Map<String, String> properties = elasticsearchSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+ public static ElasticsearchLoadNode createLoadNode(ElasticsearchSink elasticsearchSink,
+ List<FieldInfo> fieldInfos, List<FieldRelation> fieldRelations, Map<String, String> properties) {
return new ElasticsearchLoadNode(
- id,
- name,
- fields,
+ elasticsearchSink.getSinkName(),
+ elasticsearchSink.getSinkName(),
+ fieldInfos,
fieldRelations,
null,
null,
@@ -408,29 +359,21 @@ public class LoadNodeUtils {
/**
* Create load node of HDFS.
*/
- public static FileSystemLoadNode createLoadNode(HDFSSink hdfsSink) {
- String id = hdfsSink.getSinkName();
- String name = hdfsSink.getSinkName();
- List<SinkField> fieldList = hdfsSink.getSinkFieldList();
- List<FieldInfo> fields = fieldList.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
- Map<String, String> properties = hdfsSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+ public static FileSystemLoadNode createLoadNode(HDFSSink hdfsSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
List<FieldInfo> partitionFields = Lists.newArrayList();
if (CollectionUtils.isNotEmpty(hdfsSink.getPartitionFieldList())) {
partitionFields = hdfsSink.getPartitionFieldList().stream()
- .map(partitionField -> new FieldInfo(partitionField.getFieldName(), name,
+ .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hdfsSink.getSinkName(),
FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
partitionField.getFieldFormat())))
.collect(Collectors.toList());
}
return new FileSystemLoadNode(
- id,
- name,
- fields,
+ hdfsSink.getSinkName(),
+ hdfsSink.getSinkName(),
+ fieldInfos,
fieldRelations,
Lists.newArrayList(),
hdfsSink.getDataPath(),
@@ -445,25 +388,16 @@ public class LoadNodeUtils {
/**
* Create greenplum load node
*/
- public static GreenplumLoadNode createLoadNode(GreenplumSink greenplumSink) {
- String id = greenplumSink.getSinkName();
- String name = greenplumSink.getSinkName();
- List<SinkField> fieldList = greenplumSink.getSinkFieldList();
- List<FieldInfo> fields = fieldList.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
- Map<String, String> properties = greenplumSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+ public static GreenplumLoadNode createLoadNode(GreenplumSink greenplumSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
return new GreenplumLoadNode(
- id,
- name,
- fields,
+ greenplumSink.getSinkName(),
+ greenplumSink.getSinkName(),
+ fieldInfos,
fieldRelations,
null,
null,
- 1,
+ null,
properties,
greenplumSink.getJdbcUrl(),
greenplumSink.getUsername(),
@@ -475,21 +409,12 @@ public class LoadNodeUtils {
/**
* Create load node of MySQL.
*/
- public static MySqlLoadNode createLoadNode(MySQLSink mysqlSink) {
- String id = mysqlSink.getSinkName();
- String name = mysqlSink.getSinkName();
- List<SinkField> fieldList = mysqlSink.getSinkFieldList();
- List<FieldInfo> fields = fieldList.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
- Map<String, String> properties = mysqlSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+ public static MySqlLoadNode createLoadNode(MySQLSink mysqlSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
return new MySqlLoadNode(
- id,
- name,
- fields,
+ mysqlSink.getSinkName(),
+ mysqlSink.getSinkName(),
+ fieldInfos,
fieldRelations,
Lists.newArrayList(),
null,
@@ -505,25 +430,16 @@ public class LoadNodeUtils {
/**
* Create load node of ORACLE.
*/
- public static OracleLoadNode createLoadNode(OracleSink oracleSink) {
- String id = oracleSink.getSinkName();
- String name = oracleSink.getSinkName();
- List<SinkField> sinkFieldResponses = oracleSink.getSinkFieldList();
- List<FieldInfo> fields = sinkFieldResponses.stream()
- .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
- Map<String, String> properties = oracleSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+ public static OracleLoadNode createLoadNode(OracleSink oracleSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
return new OracleLoadNode(
- id,
- name,
- fields,
- fieldRelationShips,
+ oracleSink.getSinkName(),
+ oracleSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
null,
null,
- 1,
properties,
oracleSink.getJdbcUrl(),
oracleSink.getUsername(),
@@ -535,25 +451,16 @@ public class LoadNodeUtils {
/**
* Create load node of TDSQLPostgreSQL.
*/
- public static TDSQLPostgresLoadNode createLoadNode(TDSQLPostgreSQLSink tdsqlPostgreSQLSink) {
- String id = tdsqlPostgreSQLSink.getSinkName();
- String name = tdsqlPostgreSQLSink.getSinkName();
- List<SinkField> sinkFieldResponses = tdsqlPostgreSQLSink.getSinkFieldList();
- List<FieldInfo> fields = sinkFieldResponses.stream()
- .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
- Map<String, String> properties = tdsqlPostgreSQLSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+ public static TDSQLPostgresLoadNode createLoadNode(TDSQLPostgreSQLSink tdsqlPostgreSQLSink,
+ List<FieldInfo> fieldInfos, List<FieldRelation> fieldRelations, Map<String, String> properties) {
return new TDSQLPostgresLoadNode(
- id,
- name,
- fields,
- fieldRelationShips,
+ tdsqlPostgreSQLSink.getSinkName(),
+ tdsqlPostgreSQLSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
null,
null,
- 1,
properties,
tdsqlPostgreSQLSink.getJdbcUrl(),
tdsqlPostgreSQLSink.getUsername(),
@@ -565,25 +472,16 @@ public class LoadNodeUtils {
/**
* Create load node of DLCIceberg.
*/
- public static DLCIcebergLoadNode createLoadNode(DLCIcebergSink dlcIcebergSink) {
- String id = dlcIcebergSink.getSinkName();
- String name = dlcIcebergSink.getSinkName();
- List<SinkField> sinkFields = dlcIcebergSink.getSinkFieldList();
- List<FieldInfo> fields = sinkFields.stream()
- .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
- Map<String, String> properties = dlcIcebergSink.getProperties().entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+ public static DLCIcebergLoadNode createLoadNode(DLCIcebergSink dlcIcebergSink, List<FieldInfo> fieldInfos,
+ List<FieldRelation> fieldRelations, Map<String, String> properties) {
return new DLCIcebergLoadNode(
- id,
- name,
- fields,
- fieldRelationShips,
+ dlcIcebergSink.getSinkName(),
+ dlcIcebergSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
null,
null,
- 1,
properties,
dlcIcebergSink.getDbName(),
dlcIcebergSink.getTableName(),
@@ -596,23 +494,31 @@ public class LoadNodeUtils {
/**
* Parse information field of data sink.
*/
- public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList, String sinkName) {
+ public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList, String sinkName,
+ Map<String, StreamField> constantFieldMap) {
if (CollectionUtils.isEmpty(fieldList)) {
return Lists.newArrayList();
}
return fieldList.stream()
.filter(sinkField -> StringUtils.isNotEmpty(sinkField.getSourceFieldName()))
.map(field -> {
- String fieldName = field.getFieldName();
- String fieldType = field.getFieldType();
- String fieldFormat = field.getFieldFormat();
- FieldInfo sinkField = new FieldInfo(fieldName, sinkName,
- FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
- String sourceFieldName = field.getSourceFieldName();
- String sourceFieldType = field.getSourceFieldType();
- FieldInfo sourceField = new FieldInfo(sourceFieldName, sinkName,
- FieldInfoUtils.convertFieldFormat(sourceFieldType));
- return new FieldRelation(sourceField, sinkField);
+ FieldInfo outputField = new FieldInfo(field.getFieldName(), sinkName,
+ FieldInfoUtils.convertFieldFormat(field.getFieldType(), field.getFieldFormat()));
+ FunctionParam inputField;
+ String fieldKey = String.format("%s-%s", field.getOriginNodeName(), field.getSourceFieldName());
+ StreamField constantField = constantFieldMap.get(fieldKey);
+ if (constantField != null) {
+ if (outputField.getFormatInfo() != null
+ && outputField.getFormatInfo().getTypeInfo() == StringTypeInfo.INSTANCE) {
+ inputField = new StringConstantParam(constantField.getFieldValue());
+ } else {
+ inputField = new ConstantParam(constantField.getFieldValue());
+ }
+ } else {
+ inputField = new FieldInfo(field.getSourceFieldName(), field.getOriginNodeName(),
+ FieldInfoUtils.convertFieldFormat(field.getSourceFieldType()));
+ }
+ return new FieldRelation(inputField, outputField);
}).collect(Collectors.toList());
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index cbf831cab..05cb6367f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -33,6 +33,7 @@ import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import org.apache.inlong.sort.protocol.transformation.OrderDirection;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -41,21 +42,25 @@ import java.util.stream.Collectors;
@Slf4j
public class TransformNodeUtils {
- public static List<TransformNode> createTransformNodes(List<TransformResponse> transformResponses) {
+ public static List<TransformNode> createTransformNodes(List<TransformResponse> transformResponses,
+ Map<String, StreamField> constantFieldMap) {
if (CollectionUtils.isEmpty(transformResponses)) {
return Lists.newArrayList();
}
- return transformResponses.stream().map(TransformNodeUtils::createTransformNode).collect(Collectors.toList());
+ return transformResponses.stream()
+ .map(s -> TransformNodeUtils.createTransformNode(s, constantFieldMap)).collect(Collectors.toList());
}
- public static TransformNode createTransformNode(TransformResponse transformResponse) {
+ public static TransformNode createTransformNode(TransformResponse transformResponse,
+ Map<String, StreamField> constantFieldMap) {
TransformType transformType = TransformType.forType(transformResponse.getTransformType());
if (transformType == TransformType.DE_DUPLICATION) {
TransformDefinition transformDefinition = StreamParseUtils.parseTransformDefinition(
transformResponse.getTransformDefinition(), transformType);
- return createDistinctNode((DeDuplicationDefinition) transformDefinition, transformResponse);
+ return createDistinctNode((DeDuplicationDefinition) transformDefinition,
+ transformResponse, constantFieldMap);
} else {
- return createNormalTransformNode(transformResponse);
+ return createNormalTransformNode(transformResponse, constantFieldMap);
}
}
@@ -63,7 +68,7 @@ public class TransformNodeUtils {
* Create distinct node based on deDuplicationDefinition
*/
public static DistinctNode createDistinctNode(DeDuplicationDefinition deDuplicationDefinition,
- TransformResponse transformResponse) {
+ TransformResponse transformResponse, Map<String, StreamField> constantFieldMap) {
List<StreamField> streamFields = deDuplicationDefinition.getDupFields();
List<FieldInfo> distinctFields = streamFields.stream()
.map(FieldInfoUtils::parseStreamField)
@@ -83,7 +88,7 @@ public class TransformNodeUtils {
throw new UnsupportedOperationException(
String.format("Unsupported deduplication strategy=%s for inlong", deDuplicationStrategy));
}
- TransformNode transformNode = createNormalTransformNode(transformResponse);
+ TransformNode transformNode = createNormalTransformNode(transformResponse, constantFieldMap);
return new DistinctNode(transformNode.getId(),
transformNode.getName(),
transformNode.getFields(),
@@ -99,14 +104,17 @@ public class TransformNodeUtils {
/**
* Create transform node based on transformResponse
*/
- public static TransformNode createNormalTransformNode(TransformResponse transformResponse) {
+ public static TransformNode createNormalTransformNode(TransformResponse transformResponse,
+ Map<String, StreamField> constantFieldMap) {
TransformNode transformNode = new TransformNode();
transformNode.setId(transformResponse.getTransformName());
transformNode.setName(transformResponse.getTransformName());
+ // Filter constant fields
List<FieldInfo> fieldInfos = transformResponse.getFieldList().stream()
+ .filter(s -> s.getFieldValue() == null)
.map(FieldInfoUtils::parseStreamField).collect(Collectors.toList());
transformNode.setFields(fieldInfos);
- transformNode.setFieldRelations(FieldRelationUtils.createFieldRelations(transformResponse));
+ transformNode.setFieldRelations(FieldRelationUtils.createFieldRelations(transformResponse, constantFieldMap));
transformNode.setFilters(
FilterFunctionUtils.createFilterFunctions(transformResponse));
transformNode.setFilterStrategy(FilterFunctionUtils.parseFilterStrategy(transformResponse));