You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/20 02:20:23 UTC

[inlong] branch master updated: [INLONG-5103][Manager] Add constant field support for stream source and transform node (#5106)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f6f9007a1 [INLONG-5103][Manager] Add constant field support for stream source and transform node (#5106)
f6f9007a1 is described below

commit f6f9007a1ac5e95c6a371b36d4de6f3d7e8bad06
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Wed Jul 20 10:20:19 2022 +0800

    [INLONG-5103][Manager] Add constant field support for stream source and transform node (#5106)
---
 .../service/sort/DefaultSortConfigOperator.java    |  69 ++--
 .../service/sort/util/ExtractNodeUtils.java        | 143 ++++----
 .../service/sort/util/FieldRelationUtils.java      |  62 +++-
 .../manager/service/sort/util/LoadNodeUtils.java   | 358 ++++++++-------------
 .../service/sort/util/TransformNodeUtils.java      |  26 +-
 5 files changed, 285 insertions(+), 373 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
index cf77c6c28..13ad3fc65 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/DefaultSortConfigOperator.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
@@ -81,12 +82,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
 
         GroupInfo configInfo;
         // if the mode of inlong group is LIGHTWEIGHT, means not using any MQ as a cached source
-        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
-            configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos);
-        } else {
-            configInfo = this.getNormalGroupInfo(groupInfo, streamInfos);
-        }
-
+        configInfo = getGroupInfo(groupInfo, streamInfos);
         String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
         if (isStream) {
             this.addToStreamExt(streamInfos, dataflow);
@@ -99,7 +95,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
         }
     }
 
-    private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+    private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
         // get source info
         Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
         // get sink info
@@ -114,12 +110,16 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
         List<StreamInfo> sortStreamInfos = new ArrayList<>();
         for (InlongStreamInfo inlongStream : streamInfoList) {
             String streamId = inlongStream.getInlongStreamId();
-
+            Map<String, StreamField> constantFieldMap = new HashMap<>();
+            inlongStream.getSourceList()
+                    .forEach(s -> parseConstantFieldMap(s.getSourceName(), s.getFieldList(), constantFieldMap));
             List<TransformResponse> transformResponseList = transformMap.get(streamId);
+            transformResponseList
+                    .forEach(s -> parseConstantFieldMap(s.getTransformName(), s.getFieldList(), constantFieldMap));
             List<Node> nodes = this.createNodesForStream(sourceMap.get(streamId),
-                    transformResponseList, sinkMap.get(streamId));
-            StreamInfo streamInfo = new StreamInfo(streamId, nodes,
-                    NodeRelationUtils.createNodeRelationsForStream(inlongStream));
+                    transformResponseList, sinkMap.get(streamId), constantFieldMap);
+            List<NodeRelation> relations = NodeRelationUtils.createNodeRelationsForStream(inlongStream);
+            StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
             sortStreamInfos.add(streamInfo);
 
             // rebuild joinerNode relation
@@ -130,44 +130,31 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
     }
 
     /**
-     * Get Sort GroupInfo of normal inlong group.
+     * Get constant field from stream fields
      *
-     * @see org.apache.inlong.sort.protocol.GroupInfo
+     * @param nodeId The node id
+     * @param fields The stream fields
+     * @param constantFieldMap The constant field map
      */
-    private GroupInfo getNormalGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
-        // get source info
-        Map<String, List<StreamSource>> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList);
-        // get sink info
-        Map<String, List<StreamSink>> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList);
-
-        // create StreamInfo for Sort protocol
-        List<StreamInfo> sortStreamInfos = new ArrayList<>();
-        for (InlongStreamInfo inlongStream : streamInfoList) {
-            String streamId = inlongStream.getInlongStreamId();
-            List<StreamSource> sources = sourceMap.get(streamId);
-            List<StreamSink> sinks = sinkMap.get(streamId);
-            StreamInfo sortStream = new StreamInfo(streamId,
-                    this.createNodesForStream(sources, sinks),
-                    this.createNodeRelationsForStream(sources, sinks));
-            sortStreamInfos.add(sortStream);
+    private void parseConstantFieldMap(String nodeId, List<StreamField> fields,
+            Map<String, StreamField> constantFieldMap) {
+        if (CollectionUtils.isEmpty(fields)) {
+            return;
+        }
+        for (StreamField field : fields) {
+            if (field.getFieldValue() != null) {
+                constantFieldMap.put(String.format("%s-%s", nodeId, field.getFieldName()), field);
+            }
         }
-
-        return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
-    }
-
-    private List<Node> createNodesForStream(List<StreamSource> sources, List<StreamSink> streamSinks) {
-        List<Node> nodes = Lists.newArrayList();
-        nodes.addAll(ExtractNodeUtils.createExtractNodes(sources));
-        nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks));
-        return nodes;
     }
 
     private List<Node> createNodesForStream(List<StreamSource> sourceInfos,
-            List<TransformResponse> transformResponses, List<StreamSink> streamSinks) {
+            List<TransformResponse> transformResponses, List<StreamSink> streamSinks,
+            Map<String, StreamField> constantFieldMap) {
         List<Node> nodes = Lists.newArrayList();
         nodes.addAll(ExtractNodeUtils.createExtractNodes(sourceInfos));
-        nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses));
-        nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks));
+        nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
+        nodes.addAll(LoadNodeUtils.createLoadNodes(streamSinks, constantFieldMap));
         return nodes;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 22d82b3ac..42f1b1128 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -109,8 +109,6 @@ public class ExtractNodeUtils {
      * @return MySql extract node info
      */
     public static MySqlExtractNode createExtractNode(MySQLBinlogSource binlogSource) {
-        final String id = binlogSource.getSourceName();
-        final String name = binlogSource.getSourceName();
         final String database = binlogSource.getDatabaseWhiteList();
         final String primaryKey = binlogSource.getPrimaryKey();
         final String hostName = binlogSource.getHostname();
@@ -123,16 +121,12 @@ public class ExtractNodeUtils {
         }
         String tables = binlogSource.getTableWhiteList();
         final List<String> tableNames = Splitter.on(",").splitToList(tables);
-        final List<StreamField> streamFields = binlogSource.getFieldList();
-        final List<FieldInfo> fieldInfos = streamFields.stream()
-                .map(streamField -> FieldInfoUtils.parseStreamFieldInfo(streamField, name))
-                .collect(Collectors.toList());
+        List<FieldInfo> fieldInfos = parseFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
         final String serverTimeZone = binlogSource.getServerTimezone();
         boolean incrementalSnapshotEnabled = true;
 
         // TODO Needs to be configurable for those parameters
-        Map<String, String> properties = binlogSource.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+        Map<String, String> properties = parseProperties(binlogSource.getProperties());
         if (binlogSource.isAllMigration()) {
             // Unique properties when migrate all tables in database
             incrementalSnapshotEnabled = false;
@@ -142,8 +136,8 @@ public class ExtractNodeUtils {
             incrementalSnapshotEnabled = false;
             properties.put("scan.incremental.snapshot.enabled", "false");
         }
-        return new MySqlExtractNode(id,
-                name,
+        return new MySqlExtractNode(binlogSource.getSourceName(),
+                binlogSource.getSourceName(),
                 fieldInfos,
                 null,
                 properties,
@@ -166,12 +160,7 @@ public class ExtractNodeUtils {
      * @return Kafka extract node info
      */
     public static KafkaExtractNode createExtractNode(KafkaSource kafkaSource) {
-        String id = kafkaSource.getSourceName();
-        String name = kafkaSource.getSourceName();
-        List<StreamField> streamFields = kafkaSource.getFieldList();
-        List<FieldInfo> fieldInfos = streamFields.stream()
-                .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
-                .collect(Collectors.toList());
+        List<FieldInfo> fieldInfos = parseFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
         String topic = kafkaSource.getTopic();
         String bootstrapServers = kafkaSource.getBootstrapServers();
         Format format;
@@ -210,11 +199,10 @@ public class ExtractNodeUtils {
         }
         final String primaryKey = kafkaSource.getPrimaryKey();
         String groupId = kafkaSource.getGroupId();
-        Map<String, String> properties = kafkaSource.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+        Map<String, String> properties = parseProperties(kafkaSource.getProperties());
         String partitionOffset = kafkaSource.getPartitionOffsets();
-        return new KafkaExtractNode(id,
-                name,
+        return new KafkaExtractNode(kafkaSource.getSourceName(),
+                kafkaSource.getSourceName(),
                 fieldInfos,
                 null,
                 properties,
@@ -235,12 +223,7 @@ public class ExtractNodeUtils {
      * @return Pulsar extract node info
      */
     public static PulsarExtractNode createExtractNode(PulsarSource pulsarSource) {
-        String id = pulsarSource.getSourceName();
-        String name = pulsarSource.getSourceName();
-        List<StreamField> streamFields = pulsarSource.getFieldList();
-        List<FieldInfo> fieldInfos = streamFields.stream()
-                .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
-                .collect(Collectors.toList());
+        List<FieldInfo> fieldInfos = parseFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
         String fullTopicName =
                 pulsarSource.getTenant() + "/" + pulsarSource.getNamespace() + "/" + pulsarSource.getTopic();
 
@@ -274,10 +257,9 @@ public class ExtractNodeUtils {
         final String primaryKey = pulsarSource.getPrimaryKey();
         final String serviceUrl = pulsarSource.getServiceUrl();
         final String adminUrl = pulsarSource.getAdminUrl();
-        Map<String, String> properties = pulsarSource.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-        return new PulsarExtractNode(id,
-                name,
+        Map<String, String> properties = parseProperties(pulsarSource.getProperties());
+        return new PulsarExtractNode(pulsarSource.getSourceName(),
+                pulsarSource.getSourceName(),
                 fieldInfos,
                 null,
                 properties,
@@ -296,20 +278,14 @@ public class ExtractNodeUtils {
      * @return PostgreSQL extract node info
      */
     public static PostgresExtractNode createExtractNode(PostgreSQLSource postgreSQLSource) {
-        List<StreamField> streamFields = postgreSQLSource.getFieldList();
-        String id = postgreSQLSource.getSourceName();
-        String name = postgreSQLSource.getSourceName();
-        List<FieldInfo> fields = streamFields.stream()
-                .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
-                .collect(Collectors.toList());
-        Map<String, String> properties = postgreSQLSource.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-        return new PostgresExtractNode(id, name, fields, null, properties,
-                postgreSQLSource.getPrimaryKey(), postgreSQLSource.getTableNameList(),
-                postgreSQLSource.getHostname(), postgreSQLSource.getUsername(),
-                postgreSQLSource.getPassword(), postgreSQLSource.getDatabase(),
-                postgreSQLSource.getSchema(), postgreSQLSource.getPort(),
-                postgreSQLSource.getDecodingPluginName());
+        List<FieldInfo> fieldInfos = parseFieldInfos(postgreSQLSource.getFieldList(), postgreSQLSource.getSourceName());
+        Map<String, String> properties = parseProperties(postgreSQLSource.getProperties());
+        return new PostgresExtractNode(postgreSQLSource.getSourceName(), postgreSQLSource.getSourceName(),
+                fieldInfos, null, properties, postgreSQLSource.getPrimaryKey(),
+                postgreSQLSource.getTableNameList(), postgreSQLSource.getHostname(),
+                postgreSQLSource.getUsername(), postgreSQLSource.getPassword(),
+                postgreSQLSource.getDatabase(), postgreSQLSource.getSchema(),
+                postgreSQLSource.getPort(), postgreSQLSource.getDecodingPluginName());
     }
 
     /**
@@ -319,19 +295,13 @@ public class ExtractNodeUtils {
      * @return oracle extract node info
      */
     public static OracleExtractNode createExtractNode(OracleSource source) {
-        String name = source.getSourceName();
-        List<StreamField> streamFieldInfos = source.getFieldList();
-        final List<FieldInfo> fieldInfos = streamFieldInfos.stream()
-                .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
-                .collect(Collectors.toList());
-
+        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
         ScanStartUpMode scanStartupMode = StringUtils.isBlank(source.getScanStartupMode())
                 ? null : ScanStartUpMode.forName(source.getScanStartupMode());
-        Map<String, String> properties = source.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+        Map<String, String> properties = parseProperties(source.getProperties());
         return new OracleExtractNode(
-                name,
-                name,
+                source.getSourceName(),
+                source.getSourceName(),
                 fieldInfos,
                 null,
                 properties,
@@ -354,17 +324,11 @@ public class ExtractNodeUtils {
      * @return SQLServer extract node info
      */
     public static SqlServerExtractNode createExtractNode(SQLServerSource source) {
-        String name = source.getSourceName();
-        List<StreamField> streamFields = source.getFieldList();
-        List<FieldInfo> fieldInfos = streamFields.stream()
-                .map(fieldInfo -> FieldInfoUtils.parseStreamFieldInfo(fieldInfo, name))
-                .collect(Collectors.toList());
-
-        Map<String, String> properties = source.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+        Map<String, String> properties = parseProperties(source.getProperties());
         return new SqlServerExtractNode(
-                name,
-                name,
+                source.getSourceName(),
+                source.getSourceName(),
                 fieldInfos,
                 null,
                 properties,
@@ -387,16 +351,11 @@ public class ExtractNodeUtils {
      * @return MongoDB extract node info
      */
     public static MongoExtractNode createExtractNode(MongoDBSource source) {
-        String name = source.getSourceName();
-        List<StreamField> streamFields = source.getFieldList();
-        List<FieldInfo> fieldInfos = streamFields.stream()
-                .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
-                .collect(Collectors.toList());
-        Map<String, String> properties = source.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+        Map<String, String> properties = parseProperties(source.getProperties());
         return new MongoExtractNode(
-                name,
-                name,
+                source.getSourceName(),
+                source.getSourceName(),
                 fieldInfos,
                 null,
                 properties,
@@ -415,16 +374,11 @@ public class ExtractNodeUtils {
      * @return TubeMQ extract node info
      */
     public static TubeMQExtractNode createExtractNode(TubeMQSource source) {
-        String name = source.getSourceName();
-        List<StreamField> streamFields = source.getFieldList();
-        List<FieldInfo> fieldInfos = streamFields.stream()
-                .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
-                .collect(Collectors.toList());
-        Map<String, String> properties = source.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+        Map<String, String> properties = parseProperties(source.getProperties());
         return new TubeMQExtractNode(
-                name,
-                name,
+                source.getSourceName(),
+                source.getSourceName(),
                 fieldInfos,
                 null,
                 properties,
@@ -437,4 +391,29 @@ public class ExtractNodeUtils {
         );
     }
 
+    /**
+     * Parse FieldInfos
+     *
+     * @param streamFields The stream fields
+     * @param nodeId The node id
+     * @return FieldInfo list
+     */
+    private static List<FieldInfo> parseFieldInfos(List<StreamField> streamFields, String nodeId) {
+        // Filter constant fields
+        return streamFields.stream().filter(s -> s.getFieldValue() == null)
+                .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Parse properties
+     *
+     * @param properties The properties with string key and object value
+     * @return The properties with string key and string value
+     */
+    private static Map<String, String> parseProperties(Map<String, Object> properties) {
+        return properties.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
index 5793db886..a672a98fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationUtils.java
@@ -33,10 +33,12 @@ import org.apache.inlong.manager.common.pojo.transform.splitter.SplitterDefiniti
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.util.StreamParseUtils;
 import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.StringTypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.transformation.CascadeFunction;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
 import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
 import org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
 import org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction;
@@ -56,7 +58,8 @@ public class FieldRelationUtils {
     /**
      * Create relation of fields.
      */
-    public static List<FieldRelation> createFieldRelations(TransformResponse transformResponse) {
+    public static List<FieldRelation> createFieldRelations(TransformResponse transformResponse,
+            Map<String, StreamField> constantFieldMap) {
         TransformType transformType = TransformType.forType(transformResponse.getTransformType());
         TransformDefinition transformDefinition = StreamParseUtils.parseTransformDefinition(
                 transformResponse.getTransformDefinition(), transformType);
@@ -66,15 +69,17 @@ public class FieldRelationUtils {
         switch (transformType) {
             case SPLITTER:
                 SplitterDefinition splitterDefinition = (SplitterDefinition) transformDefinition;
-                return createSplitterFieldRelations(fieldList, transformName, splitterDefinition, preNodes);
+                return createSplitterFieldRelations(fieldList, transformName, splitterDefinition,
+                        preNodes, constantFieldMap);
             case STRING_REPLACER:
                 StringReplacerDefinition replacerDefinition = (StringReplacerDefinition) transformDefinition;
-                return createReplacerFieldRelations(fieldList, transformName, replacerDefinition, preNodes);
+                return createReplacerFieldRelations(fieldList, transformName,
+                        replacerDefinition, preNodes, constantFieldMap);
             case DE_DUPLICATION:
             case FILTER:
-                return createFieldRelations(fieldList, transformName);
+                return createFieldRelations(fieldList, transformName, constantFieldMap);
             case JOINER:
-                return createJoinerFieldRelations(fieldList, transformName);
+                return createJoinerFieldRelations(fieldList, transformName, constantFieldMap);
             default:
                 throw new UnsupportedOperationException(
                         String.format("Unsupported transformType=%s", transformType));
@@ -84,12 +89,25 @@ public class FieldRelationUtils {
     /**
      * Create relation of fields.
      */
-    private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList, String transformName) {
+    private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList, String transformName,
+            Map<String, StreamField> constantFieldMap) {
         return fieldList.stream()
                 .map(FieldInfoUtils::parseStreamField)
                 .map(fieldInfo -> {
-                    FieldInfo inputField = new FieldInfo(fieldInfo.getName(), fieldInfo.getNodeId(),
-                            fieldInfo.getFormatInfo());
+                    FunctionParam inputField;
+                    String fieldKey = String.format("%s-%s", fieldInfo.getNodeId(), fieldInfo.getName());
+                    StreamField constantField = constantFieldMap.get(fieldKey);
+                    if (constantField != null) {
+                        if (fieldInfo.getFormatInfo() != null
+                                && fieldInfo.getFormatInfo().getTypeInfo() == StringTypeInfo.INSTANCE) {
+                            inputField = new StringConstantParam(constantField.getFieldValue());
+                        } else {
+                            inputField = new ConstantParam(constantField.getFieldValue());
+                        }
+                    } else {
+                        inputField = new FieldInfo(fieldInfo.getName(), fieldInfo.getNodeId(),
+                                fieldInfo.getFormatInfo());
+                    }
                     FieldInfo outputField = new FieldInfo(fieldInfo.getName(), transformName,
                             fieldInfo.getFormatInfo());
                     return new FieldRelation(inputField, outputField);
@@ -99,13 +117,26 @@ public class FieldRelationUtils {
     /**
      * Create relation of fields in join function.
      */
-    private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList, String transformName) {
+    private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList, String transformName,
+            Map<String, StreamField> constantFieldMap) {
         return fieldList.stream()
                 .map(streamField -> {
                     FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
                             streamField.getFieldType(), streamField.getFieldFormat());
-                    FieldInfo inputField = new FieldInfo(streamField.getOriginFieldName(),
-                            streamField.getOriginNodeName(), formatInfo);
+                    FunctionParam inputField;
+                    String fieldKey = String.format("%s-%s", streamField.getOriginNodeName(),
+                            streamField.getOriginFieldName());
+                    StreamField constantField = constantFieldMap.get(fieldKey);
+                    if (constantField != null) {
+                        if (formatInfo != null && formatInfo.getTypeInfo() == StringTypeInfo.INSTANCE) {
+                            inputField = new StringConstantParam(constantField.getFieldValue());
+                        } else {
+                            inputField = new ConstantParam(constantField.getFieldValue());
+                        }
+                    } else {
+                        inputField = new FieldInfo(streamField.getOriginFieldName(),
+                                streamField.getOriginNodeName(), formatInfo);
+                    }
                     FieldInfo outputField = new FieldInfo(streamField.getFieldName(),
                             transformName, formatInfo);
                     return new FieldRelation(inputField, outputField);
@@ -117,7 +148,8 @@ public class FieldRelationUtils {
      */
     private static List<FieldRelation> createSplitterFieldRelations(
             List<StreamField> fieldList, String transformName,
-            SplitterDefinition splitterDefinition, String preNodes) {
+            SplitterDefinition splitterDefinition, String preNodes,
+            Map<String, StreamField> constantFieldMap) {
         Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
         String preNode = preNodes.split(",")[0];
         List<SplitRule> splitRules = splitterDefinition.getSplitRules();
@@ -131,7 +163,7 @@ public class FieldRelationUtils {
         List<StreamField> filteredFieldList = fieldList.stream()
                 .filter(streamFieldInfo -> !splitFields.contains(streamFieldInfo.getFieldName()))
                 .collect(Collectors.toList());
-        fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName));
+        fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap));
         return fieldRelations;
     }
 
@@ -139,7 +171,7 @@ public class FieldRelationUtils {
      * Create relation of fields in replace function.
      */
     private static List<FieldRelation> createReplacerFieldRelations(List<StreamField> fieldList, String transformName,
-            StringReplacerDefinition replacerDefinition, String preNodes) {
+            StringReplacerDefinition replacerDefinition, String preNodes, Map<String, StreamField> constantFieldMap) {
         Preconditions.checkNotEmpty(preNodes, "PreNodes of splitter should not be null");
         String preNode = preNodes.split(",")[0];
         List<ReplaceRule> replaceRules = replacerDefinition.getReplaceRules();
@@ -151,7 +183,7 @@ public class FieldRelationUtils {
         List<StreamField> filteredFieldList = fieldList.stream()
                 .filter(streamFieldInfo -> !replaceFields.contains(streamFieldInfo.getFieldName()))
                 .collect(Collectors.toList());
-        fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName));
+        fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap));
         return fieldRelations;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 0e0c135cd..49ad68231 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -41,6 +41,8 @@ import org.apache.inlong.manager.common.pojo.sink.oracle.OracleSink;
 import org.apache.inlong.manager.common.pojo.sink.postgresql.PostgreSQLSink;
 import org.apache.inlong.manager.common.pojo.sink.sqlserver.SQLServerSink;
 import org.apache.inlong.manager.common.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.sort.formats.common.StringTypeInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
 import org.apache.inlong.sort.protocol.node.LoadNode;
@@ -64,7 +66,10 @@ import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
 import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
 import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
 import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
 import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
 
 import java.util.HashMap;
 import java.util.List;
@@ -79,47 +84,56 @@ public class LoadNodeUtils {
     /**
      * Create nodes of data load.
      */
-    public static List<LoadNode> createLoadNodes(List<StreamSink> streamSinks) {
+    public static List<LoadNode> createLoadNodes(List<StreamSink> streamSinks,
+            Map<String, StreamField> constantFieldMap) {
         if (CollectionUtils.isEmpty(streamSinks)) {
             return Lists.newArrayList();
         }
-        return streamSinks.stream().map(LoadNodeUtils::createLoadNode).collect(Collectors.toList());
+        return streamSinks.stream()
+                .map(s -> LoadNodeUtils.createLoadNode(s, constantFieldMap)).collect(Collectors.toList());
     }
 
     /**
      * Create load node from the stream sink info.
      */
-    public static LoadNode createLoadNode(StreamSink streamSink) {
+    public static LoadNode createLoadNode(StreamSink streamSink, Map<String, StreamField> constantFieldMap) {
+        List<FieldInfo> fieldInfos = streamSink.getSinkFieldList().stream()
+                .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, streamSink.getSinkName()))
+                .collect(Collectors.toList());
+        List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(),
+                streamSink.getSinkName(), constantFieldMap);
+        Map<String, String> properties = streamSink.getProperties().entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
         SinkType sinkType = SinkType.forType(streamSink.getSinkType());
         switch (sinkType) {
             case KAFKA:
-                return createLoadNode((KafkaSink) streamSink);
+                return createLoadNode((KafkaSink) streamSink, fieldInfos, fieldRelations, properties);
             case HIVE:
-                return createLoadNode((HiveSink) streamSink);
+                return createLoadNode((HiveSink) streamSink, fieldInfos, fieldRelations, properties);
             case HBASE:
-                return createLoadNode((HBaseSink) streamSink);
+                return createLoadNode((HBaseSink) streamSink, fieldInfos, fieldRelations, properties);
             case POSTGRES:
-                return createLoadNode((PostgreSQLSink) streamSink);
+                return createLoadNode((PostgreSQLSink) streamSink, fieldInfos, fieldRelations, properties);
             case CLICKHOUSE:
-                return createLoadNode((ClickHouseSink) streamSink);
+                return createLoadNode((ClickHouseSink) streamSink, fieldInfos, fieldRelations, properties);
             case ICEBERG:
-                return createLoadNode((IcebergSink) streamSink);
+                return createLoadNode((IcebergSink) streamSink, fieldInfos, fieldRelations, properties);
             case SQLSERVER:
-                return createLoadNode((SQLServerSink) streamSink);
+                return createLoadNode((SQLServerSink) streamSink, fieldInfos, fieldRelations, properties);
             case ELASTICSEARCH:
-                return createLoadNode((ElasticsearchSink) streamSink);
+                return createLoadNode((ElasticsearchSink) streamSink, fieldInfos, fieldRelations, properties);
             case HDFS:
-                return createLoadNode((HDFSSink) streamSink);
+                return createLoadNode((HDFSSink) streamSink, fieldInfos, fieldRelations, properties);
             case GREENPLUM:
-                return createLoadNode((GreenplumSink) streamSink);
+                return createLoadNode((GreenplumSink) streamSink, fieldInfos, fieldRelations, properties);
             case MYSQL:
-                return createLoadNode((MySQLSink) streamSink);
+                return createLoadNode((MySQLSink) streamSink, fieldInfos, fieldRelations, properties);
             case ORACLE:
-                return createLoadNode((OracleSink) streamSink);
+                return createLoadNode((OracleSink) streamSink, fieldInfos, fieldRelations, properties);
             case TDSQLPOSTGRESQL:
-                return createLoadNode((TDSQLPostgreSQLSink) streamSink);
+                return createLoadNode((TDSQLPostgreSQLSink) streamSink, fieldInfos, fieldRelations, properties);
             case DLCICEBERG:
-                return createLoadNode((DLCIcebergSink) streamSink);
+                return createLoadNode((DLCIcebergSink) streamSink, fieldInfos, fieldRelations, properties);
             default:
                 throw new BusinessException(String.format("Unsupported sinkType=%s to create load node", sinkType));
         }
@@ -128,16 +142,8 @@ public class LoadNodeUtils {
     /**
      * Create load node of Kafka.
      */
-    public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink) {
-        String id = kafkaSink.getSinkName();
-        String name = kafkaSink.getSinkName();
-        List<SinkField> fieldList = kafkaSink.getSinkFieldList();
-        List<FieldInfo> fieldInfos = fieldList.stream()
-                .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-        Map<String, String> properties = kafkaSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+    public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         Integer sinkParallelism = null;
         if (StringUtils.isNotEmpty(kafkaSink.getPartitionNum())) {
             sinkParallelism = Integer.parseInt(kafkaSink.getPartitionNum());
@@ -165,8 +171,8 @@ public class LoadNodeUtils {
         }
 
         return new KafkaLoadNode(
-                id,
-                name,
+                kafkaSink.getSinkName(),
+                kafkaSink.getSinkName(),
                 fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
@@ -183,27 +189,19 @@ public class LoadNodeUtils {
     /**
      * Create load node of Hive.
      */
-    public static HiveLoadNode createLoadNode(HiveSink hiveSink) {
-        String id = hiveSink.getSinkName();
-        String name = hiveSink.getSinkName();
-        List<SinkField> fieldList = hiveSink.getSinkFieldList();
-        List<FieldInfo> fields = fieldList.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-        Map<String, String> properties = hiveSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+    public static HiveLoadNode createLoadNode(HiveSink hiveSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         List<FieldInfo> partitionFields = Lists.newArrayList();
         if (CollectionUtils.isNotEmpty(hiveSink.getPartitionFieldList())) {
             partitionFields = hiveSink.getPartitionFieldList().stream()
-                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), name,
+                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hiveSink.getSinkName(),
                             FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
                                     partitionField.getFieldFormat()))).collect(Collectors.toList());
         }
         return new HiveLoadNode(
-                id,
-                name,
-                fields,
+                hiveSink.getSinkName(),
+                hiveSink.getSinkName(),
+                fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
                 null,
@@ -222,20 +220,12 @@ public class LoadNodeUtils {
     /**
      * Create load node of HBase.
      */
-    public static HbaseLoadNode createLoadNode(HBaseSink hbaseSink) {
-        String id = hbaseSink.getSinkName();
-        String name = hbaseSink.getSinkName();
-        List<SinkField> fieldList = hbaseSink.getSinkFieldList();
-        List<FieldInfo> fields = fieldList.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-        Map<String, String> properties = hbaseSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+    public static HbaseLoadNode createLoadNode(HBaseSink hbaseSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         return new HbaseLoadNode(
-                id,
-                name,
-                fields,
+                hbaseSink.getSinkName(),
+                hbaseSink.getSinkName(),
+                fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
                 null,
@@ -255,23 +245,17 @@ public class LoadNodeUtils {
     /**
      * Create load node of PostgreSQL.
      */
-    public static PostgresLoadNode createLoadNode(PostgreSQLSink postgreSQLSink) {
-        List<SinkField> fieldList = postgreSQLSink.getSinkFieldList();
-        String name = postgreSQLSink.getSinkName();
-        List<FieldInfo> fields = fieldList.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-
+    public static PostgresLoadNode createLoadNode(PostgreSQLSink postgreSQLSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         return new PostgresLoadNode(
-                name,
-                name,
-                fields,
+                postgreSQLSink.getSinkName(),
+                postgreSQLSink.getSinkName(),
+                fieldInfos,
                 fieldRelations,
                 null,
                 null,
-                1,
                 null,
+                properties,
                 postgreSQLSink.getJdbcUrl(),
                 postgreSQLSink.getUsername(),
                 postgreSQLSink.getPassword(),
@@ -283,23 +267,17 @@ public class LoadNodeUtils {
     /**
      * Create load node of ClickHouse.
      */
-    public static ClickHouseLoadNode createLoadNode(ClickHouseSink ckSink) {
-        List<SinkField> sinkFields = ckSink.getSinkFieldList();
-        String name = ckSink.getSinkName();
-        List<FieldInfo> fields = sinkFields.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(sinkFields, name);
-
+    public static ClickHouseLoadNode createLoadNode(ClickHouseSink ckSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         return new ClickHouseLoadNode(
-                name,
-                name,
-                fields,
+                ckSink.getSinkName(),
+                ckSink.getSinkName(),
+                fieldInfos,
                 fieldRelations,
                 null,
                 null,
-                1,
                 null,
+                properties,
                 ckSink.getTableName(),
                 ckSink.getJdbcUrl() + "/" + ckSink.getDbName(),
                 ckSink.getUsername(),
@@ -310,26 +288,17 @@ public class LoadNodeUtils {
     /**
      * Create load node of Iceberg.
      */
-    public static IcebergLoadNode createLoadNode(IcebergSink icebergSink) {
-        String id = icebergSink.getSinkName();
-        String name = icebergSink.getSinkName();
+    public static IcebergLoadNode createLoadNode(IcebergSink icebergSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType());
-        List<SinkField> sinkFields = icebergSink.getSinkFieldList();
-        List<FieldInfo> fields = sinkFields.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
-        Map<String, String> properties = icebergSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
         return new IcebergLoadNode(
-                id,
-                name,
-                fields,
-                fieldRelationShips,
+                icebergSink.getSinkName(),
+                icebergSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
                 null,
                 null,
-                1,
                 properties,
                 icebergSink.getDbName(),
                 icebergSink.getTableName(),
@@ -343,21 +312,12 @@ public class LoadNodeUtils {
     /**
      * Create load node of SQLServer.
      */
-    public static SqlServerLoadNode createLoadNode(SQLServerSink sqlServerSink) {
-        final String id = sqlServerSink.getSinkName();
-        final String name = sqlServerSink.getSinkName();
-        final List<SinkField> fieldList = sqlServerSink.getSinkFieldList();
-        List<FieldInfo> fields = fieldList.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-        Map<String, String> properties = sqlServerSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+    public static SqlServerLoadNode createLoadNode(SQLServerSink sqlServerSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         return new SqlServerLoadNode(
-                id,
-                name,
-                fields,
+                sqlServerSink.getSinkName(),
+                sqlServerSink.getSinkName(),
+                fieldInfos,
                 fieldRelations,
                 null,
                 null,
@@ -375,21 +335,12 @@ public class LoadNodeUtils {
     /**
      * Create Elasticsearch load node
      */
-    public static ElasticsearchLoadNode createLoadNode(ElasticsearchSink elasticsearchSink) {
-        final String id = elasticsearchSink.getSinkName();
-        final String name = elasticsearchSink.getSinkName();
-        final List<SinkField> fieldList = elasticsearchSink.getSinkFieldList();
-        List<FieldInfo> fields = fieldList.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-        Map<String, String> properties = elasticsearchSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+    public static ElasticsearchLoadNode createLoadNode(ElasticsearchSink elasticsearchSink,
+            List<FieldInfo> fieldInfos, List<FieldRelation> fieldRelations, Map<String, String> properties) {
         return new ElasticsearchLoadNode(
-                id,
-                name,
-                fields,
+                elasticsearchSink.getSinkName(),
+                elasticsearchSink.getSinkName(),
+                fieldInfos,
                 fieldRelations,
                 null,
                 null,
@@ -408,29 +359,21 @@ public class LoadNodeUtils {
     /**
      * Create load node of HDFS.
      */
-    public static FileSystemLoadNode createLoadNode(HDFSSink hdfsSink) {
-        String id = hdfsSink.getSinkName();
-        String name = hdfsSink.getSinkName();
-        List<SinkField> fieldList = hdfsSink.getSinkFieldList();
-        List<FieldInfo> fields = fieldList.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-        Map<String, String> properties = hdfsSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
+    public static FileSystemLoadNode createLoadNode(HDFSSink hdfsSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         List<FieldInfo> partitionFields = Lists.newArrayList();
         if (CollectionUtils.isNotEmpty(hdfsSink.getPartitionFieldList())) {
             partitionFields = hdfsSink.getPartitionFieldList().stream()
-                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), name,
+                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hdfsSink.getSinkName(),
                             FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
                                     partitionField.getFieldFormat())))
                     .collect(Collectors.toList());
         }
 
         return new FileSystemLoadNode(
-                id,
-                name,
-                fields,
+                hdfsSink.getSinkName(),
+                hdfsSink.getSinkName(),
+                fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
                 hdfsSink.getDataPath(),
@@ -445,25 +388,16 @@ public class LoadNodeUtils {
     /**
      * Create greenplum load node
      */
-    public static GreenplumLoadNode createLoadNode(GreenplumSink greenplumSink) {
-        String id = greenplumSink.getSinkName();
-        String name = greenplumSink.getSinkName();
-        List<SinkField> fieldList = greenplumSink.getSinkFieldList();
-        List<FieldInfo> fields = fieldList.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-        Map<String, String> properties = greenplumSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+    public static GreenplumLoadNode createLoadNode(GreenplumSink greenplumSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         return new GreenplumLoadNode(
-                id,
-                name,
-                fields,
+                greenplumSink.getSinkName(),
+                greenplumSink.getSinkName(),
+                fieldInfos,
                 fieldRelations,
                 null,
                 null,
-                1,
+                null,
                 properties,
                 greenplumSink.getJdbcUrl(),
                 greenplumSink.getUsername(),
@@ -475,21 +409,12 @@ public class LoadNodeUtils {
     /**
      * Create load node of MySQL.
      */
-    public static MySqlLoadNode createLoadNode(MySQLSink mysqlSink) {
-        String id = mysqlSink.getSinkName();
-        String name = mysqlSink.getSinkName();
-        List<SinkField> fieldList = mysqlSink.getSinkFieldList();
-        List<FieldInfo> fields = fieldList.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(fieldList, name);
-        Map<String, String> properties = mysqlSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+    public static MySqlLoadNode createLoadNode(MySQLSink mysqlSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         return new MySqlLoadNode(
-                id,
-                name,
-                fields,
+                mysqlSink.getSinkName(),
+                mysqlSink.getSinkName(),
+                fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
                 null,
@@ -505,25 +430,16 @@ public class LoadNodeUtils {
     /**
      * Create load node of ORACLE.
      */
-    public static OracleLoadNode createLoadNode(OracleSink oracleSink) {
-        String id = oracleSink.getSinkName();
-        String name = oracleSink.getSinkName();
-        List<SinkField> sinkFieldResponses = oracleSink.getSinkFieldList();
-        List<FieldInfo> fields = sinkFieldResponses.stream()
-                .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
-        Map<String, String> properties = oracleSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+    public static OracleLoadNode createLoadNode(OracleSink oracleSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         return new OracleLoadNode(
-                id,
-                name,
-                fields,
-                fieldRelationShips,
+                oracleSink.getSinkName(),
+                oracleSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
                 null,
                 null,
-                1,
                 properties,
                 oracleSink.getJdbcUrl(),
                 oracleSink.getUsername(),
@@ -535,25 +451,16 @@ public class LoadNodeUtils {
     /**
      * Create load node of TDSQLPostgreSQL.
      */
-    public static TDSQLPostgresLoadNode createLoadNode(TDSQLPostgreSQLSink tdsqlPostgreSQLSink) {
-        String id = tdsqlPostgreSQLSink.getSinkName();
-        String name = tdsqlPostgreSQLSink.getSinkName();
-        List<SinkField> sinkFieldResponses = tdsqlPostgreSQLSink.getSinkFieldList();
-        List<FieldInfo> fields = sinkFieldResponses.stream()
-                .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
-        Map<String, String> properties = tdsqlPostgreSQLSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+    public static TDSQLPostgresLoadNode createLoadNode(TDSQLPostgreSQLSink tdsqlPostgreSQLSink,
+            List<FieldInfo> fieldInfos, List<FieldRelation> fieldRelations, Map<String, String> properties) {
         return new TDSQLPostgresLoadNode(
-                id,
-                name,
-                fields,
-                fieldRelationShips,
+                tdsqlPostgreSQLSink.getSinkName(),
+                tdsqlPostgreSQLSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
                 null,
                 null,
-                1,
                 properties,
                 tdsqlPostgreSQLSink.getJdbcUrl(),
                 tdsqlPostgreSQLSink.getUsername(),
@@ -565,25 +472,16 @@ public class LoadNodeUtils {
     /**
      * Create load node of DLCIceberg.
      */
-    public static DLCIcebergLoadNode createLoadNode(DLCIcebergSink dlcIcebergSink) {
-        String id = dlcIcebergSink.getSinkName();
-        String name = dlcIcebergSink.getSinkName();
-        List<SinkField> sinkFields = dlcIcebergSink.getSinkFieldList();
-        List<FieldInfo> fields = sinkFields.stream()
-                .map(sinkField -> FieldInfoUtils.parseSinkFieldInfo(sinkField, name))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelationShips = parseSinkFields(sinkFields, name);
-        Map<String, String> properties = dlcIcebergSink.getProperties().entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-
+    public static DLCIcebergLoadNode createLoadNode(DLCIcebergSink dlcIcebergSink, List<FieldInfo> fieldInfos,
+            List<FieldRelation> fieldRelations, Map<String, String> properties) {
         return new DLCIcebergLoadNode(
-                id,
-                name,
-                fields,
-                fieldRelationShips,
+                dlcIcebergSink.getSinkName(),
+                dlcIcebergSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
                 null,
                 null,
-                1,
                 properties,
                 dlcIcebergSink.getDbName(),
                 dlcIcebergSink.getTableName(),
@@ -596,23 +494,31 @@ public class LoadNodeUtils {
     /**
      * Parse information field of data sink.
      */
-    public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList, String sinkName) {
+    public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList, String sinkName,
+            Map<String, StreamField> constantFieldMap) {
         if (CollectionUtils.isEmpty(fieldList)) {
             return Lists.newArrayList();
         }
         return fieldList.stream()
                 .filter(sinkField -> StringUtils.isNotEmpty(sinkField.getSourceFieldName()))
                 .map(field -> {
-                    String fieldName = field.getFieldName();
-                    String fieldType = field.getFieldType();
-                    String fieldFormat = field.getFieldFormat();
-                    FieldInfo sinkField = new FieldInfo(fieldName, sinkName,
-                            FieldInfoUtils.convertFieldFormat(fieldType, fieldFormat));
-                    String sourceFieldName = field.getSourceFieldName();
-                    String sourceFieldType = field.getSourceFieldType();
-                    FieldInfo sourceField = new FieldInfo(sourceFieldName, sinkName,
-                            FieldInfoUtils.convertFieldFormat(sourceFieldType));
-                    return new FieldRelation(sourceField, sinkField);
+                    FieldInfo outputField = new FieldInfo(field.getFieldName(), sinkName,
+                            FieldInfoUtils.convertFieldFormat(field.getFieldType(), field.getFieldFormat()));
+                    FunctionParam inputField;
+                    String fieldKey = String.format("%s-%s", field.getOriginNodeName(), field.getSourceFieldName());
+                    StreamField constantField = constantFieldMap.get(fieldKey);
+                    if (constantField != null) {
+                        if (outputField.getFormatInfo() != null
+                                && outputField.getFormatInfo().getTypeInfo() == StringTypeInfo.INSTANCE) {
+                            inputField = new StringConstantParam(constantField.getFieldValue());
+                        } else {
+                            inputField = new ConstantParam(constantField.getFieldValue());
+                        }
+                    } else {
+                        inputField = new FieldInfo(field.getSourceFieldName(), field.getOriginNodeName(),
+                                FieldInfoUtils.convertFieldFormat(field.getSourceFieldType()));
+                    }
+                    return new FieldRelation(inputField, outputField);
                 }).collect(Collectors.toList());
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
index cbf831cab..05cb6367f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/TransformNodeUtils.java
@@ -33,6 +33,7 @@ import org.apache.inlong.sort.protocol.node.transform.TransformNode;
 import org.apache.inlong.sort.protocol.transformation.OrderDirection;
 
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /**
@@ -41,21 +42,25 @@ import java.util.stream.Collectors;
 @Slf4j
 public class TransformNodeUtils {
 
-    public static List<TransformNode> createTransformNodes(List<TransformResponse> transformResponses) {
+    public static List<TransformNode> createTransformNodes(List<TransformResponse> transformResponses,
+            Map<String, StreamField> constantFieldMap) {
         if (CollectionUtils.isEmpty(transformResponses)) {
             return Lists.newArrayList();
         }
-        return transformResponses.stream().map(TransformNodeUtils::createTransformNode).collect(Collectors.toList());
+        return transformResponses.stream()
+                .map(s -> TransformNodeUtils.createTransformNode(s, constantFieldMap)).collect(Collectors.toList());
     }
 
-    public static TransformNode createTransformNode(TransformResponse transformResponse) {
+    public static TransformNode createTransformNode(TransformResponse transformResponse,
+            Map<String, StreamField> constantFieldMap) {
         TransformType transformType = TransformType.forType(transformResponse.getTransformType());
         if (transformType == TransformType.DE_DUPLICATION) {
             TransformDefinition transformDefinition = StreamParseUtils.parseTransformDefinition(
                     transformResponse.getTransformDefinition(), transformType);
-            return createDistinctNode((DeDuplicationDefinition) transformDefinition, transformResponse);
+            return createDistinctNode((DeDuplicationDefinition) transformDefinition,
+                    transformResponse, constantFieldMap);
         } else {
-            return createNormalTransformNode(transformResponse);
+            return createNormalTransformNode(transformResponse, constantFieldMap);
         }
     }
 
@@ -63,7 +68,7 @@ public class TransformNodeUtils {
      * Create distinct node based on deDuplicationDefinition
      */
     public static DistinctNode createDistinctNode(DeDuplicationDefinition deDuplicationDefinition,
-            TransformResponse transformResponse) {
+            TransformResponse transformResponse, Map<String, StreamField> constantFieldMap) {
         List<StreamField> streamFields = deDuplicationDefinition.getDupFields();
         List<FieldInfo> distinctFields = streamFields.stream()
                 .map(FieldInfoUtils::parseStreamField)
@@ -83,7 +88,7 @@ public class TransformNodeUtils {
                 throw new UnsupportedOperationException(
                         String.format("Unsupported deduplication strategy=%s for inlong", deDuplicationStrategy));
         }
-        TransformNode transformNode = createNormalTransformNode(transformResponse);
+        TransformNode transformNode = createNormalTransformNode(transformResponse, constantFieldMap);
         return new DistinctNode(transformNode.getId(),
                 transformNode.getName(),
                 transformNode.getFields(),
@@ -99,14 +104,17 @@ public class TransformNodeUtils {
     /**
      * Create transform node based on transformResponse
      */
-    public static TransformNode createNormalTransformNode(TransformResponse transformResponse) {
+    public static TransformNode createNormalTransformNode(TransformResponse transformResponse,
+            Map<String, StreamField> constantFieldMap) {
         TransformNode transformNode = new TransformNode();
         transformNode.setId(transformResponse.getTransformName());
         transformNode.setName(transformResponse.getTransformName());
+        // Filter constant fields
         List<FieldInfo> fieldInfos = transformResponse.getFieldList().stream()
+                .filter(s -> s.getFieldValue() == null)
                 .map(FieldInfoUtils::parseStreamField).collect(Collectors.toList());
         transformNode.setFields(fieldInfos);
-        transformNode.setFieldRelations(FieldRelationUtils.createFieldRelations(transformResponse));
+        transformNode.setFieldRelations(FieldRelationUtils.createFieldRelations(transformResponse, constantFieldMap));
         transformNode.setFilters(
                 FilterFunctionUtils.createFilterFunctions(transformResponse));
         transformNode.setFilterStrategy(FilterFunctionUtils.parseFilterStrategy(transformResponse));