You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/06/08 07:20:45 UTC

[inlong] branch master updated: [INLONG-8171][Manager] Optimize the way of creating LoadNodes to make it easy to expand and maintain (#8181)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 716d5011b3 [INLONG-8171][Manager] Optimize the way of creating LoadNodes to make it easy to expand and maintain (#8181)
716d5011b3 is described below

commit 716d5011b3cab4783e619ba937d204570cc83f68
Author: chestnufang <65...@users.noreply.github.com>
AuthorDate: Thu Jun 8 15:20:38 2023 +0800

    [INLONG-8171][Manager] Optimize the way of creating LoadNodes to make it easy to expand and maintain (#8181)
    
    Co-authored-by: chestnufang <ch...@tencent.com>
    Co-authored-by: Charles Zhang <do...@apache.org>
    Co-authored-by: healchow <he...@gmail.com>
---
 .../inlong/manager/common/consts/SinkType.java     |   7 +-
 .../inlong/manager/common/consts/SourceType.java   |   7 +-
 .../consts/{SinkType.java => StreamType.java}      |  18 +-
 ...actory.java => ExtractNodeProviderFactory.java} |  50 +-
 .../pojo/sort/node/LoadNodeProviderFactory.java    |  90 +++
 .../inlong/manager/pojo/sort/node/NodeFactory.java |  64 ++
 .../sort/node/{ => base}/ExtractNodeProvider.java  |  16 +-
 .../pojo/sort/node/base/LoadNodeProvider.java      | 100 +++
 .../pojo/sort/node/{ => base}/NodeProvider.java    |  10 +-
 .../pojo/sort/node/extract/HudiProvider.java       |  64 --
 .../pojo/sort/node/extract/KafkaProvider.java      |  93 ---
 .../pojo/sort/node/extract/RedisProvider.java      | 130 ----
 .../pojo/sort/node/extract/SqlServerProvider.java  |  63 --
 .../sort/node/provider/ClickHouseProvider.java     |  64 ++
 .../pojo/sort/node/provider/DorisProvider.java     |  89 +++
 .../sort/node/provider/ElasticsearchProvider.java  |  94 +++
 .../pojo/sort/node/provider/GreenplumProvider.java |  64 ++
 .../pojo/sort/node/provider/HBaseProvider.java     |  69 ++
 .../pojo/sort/node/provider/HDFSProvider.java      |  76 ++
 .../pojo/sort/node/provider/HiveProvider.java      |  79 ++
 .../pojo/sort/node/provider/HudiProvider.java      |  98 +++
 .../pojo/sort/node/provider/IcebergProvider.java   |  69 ++
 .../pojo/sort/node/provider/KafkaProvider.java     | 159 ++++
 .../pojo/sort/node/provider/KuduProvider.java      |  63 ++
 .../node/{extract => provider}/MongoProvider.java  |   8 +-
 .../MySQLBinlogProvider.java}                      |  12 +-
 .../pojo/sort/node/provider/MySQLProvider.java     |  68 ++
 .../node/{extract => provider}/OracleProvider.java |  47 +-
 .../PostgreSQLProvider.java}                       |  49 +-
 .../node/{extract => provider}/PulsarProvider.java |   8 +-
 .../pojo/sort/node/provider/RedisProvider.java     | 262 +++++++
 .../pojo/sort/node/provider/SQLServerProvider.java |  93 +++
 .../pojo/sort/node/provider/StarRocksProvider.java |  93 +++
 .../node/provider/TDSQLPostgreSQLProvider.java     |  67 ++
 .../node/{extract => provider}/TubeMqProvider.java |   8 +-
 .../manager/pojo/sort/util/LoadNodeUtils.java      | 822 ---------------------
 .../resource/sort/DefaultSortConfigOperator.java   |   7 +-
 37 files changed, 1894 insertions(+), 1286 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index a74c9c3370..be60980f4c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -20,21 +20,16 @@ package org.apache.inlong.manager.common.consts;
 /**
  * Constants of sink type.
  */
-public class SinkType {
+public class SinkType extends StreamType {
 
     public static final String HIVE = "HIVE";
-    public static final String KAFKA = "KAFKA";
     public static final String ICEBERG = "ICEBERG";
-    public static final String HUDI = "HUDI";
     public static final String CLICKHOUSE = "CLICKHOUSE";
     public static final String HBASE = "HBASE";
-    public static final String POSTGRESQL = "POSTGRESQL";
     public static final String ELASTICSEARCH = "ELASTICSEARCH";
-    public static final String SQLSERVER = "SQLSERVER";
     public static final String HDFS = "HDFS";
     public static final String GREENPLUM = "GREENPLUM";
     public static final String MYSQL = "MYSQL";
-    public static final String ORACLE = "ORACLE";
     public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
     public static final String DORIS = "DORIS";
     public static final String STARROCKS = "STARROCKS";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index c0a718f2bb..e01989501a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -25,23 +25,18 @@ import java.util.Map;
 /**
  * Constants of source type.
  */
-public class SourceType {
+public class SourceType extends StreamType {
 
     public static final String AUTO_PUSH = "AUTO_PUSH";
     public static final String TUBEMQ = "TUBEMQ";
     public static final String PULSAR = "PULSAR";
-    public static final String KAFKA = "KAFKA";
 
     public static final String FILE = "FILE";
     public static final String MYSQL_SQL = "MYSQL_SQL";
     public static final String MYSQL_BINLOG = "MYSQL_BINLOG";
-    public static final String POSTGRESQL = "POSTGRESQL";
-    public static final String ORACLE = "ORACLE";
-    public static final String SQLSERVER = "SQLSERVER";
     public static final String MONGODB = "MONGODB";
     public static final String REDIS = "REDIS";
     public static final String MQTT = "MQTT";
-    public static final String HUDI = "HUDI";
 
     public static final Map<String, TaskTypeEnum> SOURCE_TASK_MAP = new HashMap<String, TaskTypeEnum>() {
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
similarity index 60%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
index a74c9c3370..26685814de 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
@@ -18,26 +18,14 @@
 package org.apache.inlong.manager.common.consts;
 
 /**
- * Constants of sink type.
+ * Constant for stream types, indicating that both StreamSource and StreamSink support these types.
  */
-public class SinkType {
+public class StreamType {
 
-    public static final String HIVE = "HIVE";
     public static final String KAFKA = "KAFKA";
-    public static final String ICEBERG = "ICEBERG";
     public static final String HUDI = "HUDI";
-    public static final String CLICKHOUSE = "CLICKHOUSE";
-    public static final String HBASE = "HBASE";
     public static final String POSTGRESQL = "POSTGRESQL";
-    public static final String ELASTICSEARCH = "ELASTICSEARCH";
     public static final String SQLSERVER = "SQLSERVER";
-    public static final String HDFS = "HDFS";
-    public static final String GREENPLUM = "GREENPLUM";
-    public static final String MYSQL = "MYSQL";
     public static final String ORACLE = "ORACLE";
-    public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
-    public static final String DORIS = "DORIS";
-    public static final String STARROCKS = "STARROCKS";
-    public static final String KUDU = "KUDU";
-    public static final String REDIS = "REDIS";
+
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
similarity index 58%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
index 35c447ec7e..6e95425ef2 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
@@ -19,30 +19,25 @@ package org.apache.inlong.manager.pojo.sort.node;
 
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.sort.node.extract.HudiProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.KafkaProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.MongoProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.MysqlBinlogProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.OracleProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.PostgreSqlProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.PulsarProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.RedisProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.SqlServerProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.TubeMqProvider;
-import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.HudiProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.KafkaProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.MongoProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.MySQLBinlogProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.OracleProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.PostgreSQLProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.PulsarProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.RedisProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.SQLServerProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.TubeMqProvider;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
- * The node provider factory.
+ * Factory of the extract node provider.
  */
-public class NodeProviderFactory {
+public class ExtractNodeProviderFactory {
 
     /**
      * The extract node provider collection
@@ -58,9 +53,9 @@ public class NodeProviderFactory {
         EXTRACT_NODE_PROVIDER_LIST.add(new PulsarProvider());
         EXTRACT_NODE_PROVIDER_LIST.add(new RedisProvider());
         EXTRACT_NODE_PROVIDER_LIST.add(new TubeMqProvider());
-        EXTRACT_NODE_PROVIDER_LIST.add(new SqlServerProvider());
-        EXTRACT_NODE_PROVIDER_LIST.add(new PostgreSqlProvider());
-        EXTRACT_NODE_PROVIDER_LIST.add(new MysqlBinlogProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new SQLServerProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new PostgreSQLProvider());
+        EXTRACT_NODE_PROVIDER_LIST.add(new MySQLBinlogProvider());
 
     }
 
@@ -77,17 +72,4 @@ public class NodeProviderFactory {
                 .orElseThrow(() -> new BusinessException(ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORT,
                         String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORT.getMessage(), sourceType)));
     }
-
-    /**
-     * Create extract nodes from the given sources.
-     */
-    public static List<ExtractNode> createExtractNodes(List<StreamSource> sourceInfos) {
-        if (CollectionUtils.isEmpty(sourceInfos)) {
-            return Lists.newArrayList();
-        }
-        return sourceInfos.stream().map(v -> {
-            String sourceType = v.getSourceType();
-            return getExtractNodeProvider(sourceType).createNode(v);
-        }).collect(Collectors.toList());
-    }
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/LoadNodeProviderFactory.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/LoadNodeProviderFactory.java
new file mode 100644
index 0000000000..1df97b77fb
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/LoadNodeProviderFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.ClickHouseProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.DorisProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.ElasticsearchProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.GreenplumProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.HBaseProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.HDFSProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.HiveProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.HudiProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.IcebergProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.KafkaProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.KuduProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.MySQLProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.OracleProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.PostgreSQLProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.RedisProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.SQLServerProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.StarRocksProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.TDSQLPostgreSQLProvider;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Factory of the load node provider.
+ */
+public class LoadNodeProviderFactory {
+
+    /**
+     * The load node provider collection
+     */
+    private static final List<LoadNodeProvider> LOAD_NODE_PROVIDER_LIST = new ArrayList<>();
+
+    static {
+        // The Providers Parsing SinkInfo to LoadNode which sort needed
+        LOAD_NODE_PROVIDER_LIST.add(new KafkaProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new ClickHouseProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new DorisProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new ElasticsearchProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new GreenplumProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new HBaseProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new HDFSProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new HiveProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new HudiProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new IcebergProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new KuduProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new MySQLProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new OracleProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new PostgreSQLProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new RedisProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new SQLServerProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new StarRocksProvider());
+        LOAD_NODE_PROVIDER_LIST.add(new TDSQLPostgreSQLProvider());
+    }
+
+    /**
+     * Get load node provider
+     *
+     * @param sinkType the specified sink type
+     * @return the load node provider
+     */
+    public static LoadNodeProvider getLoadNodeProvider(String sinkType) {
+        return LOAD_NODE_PROVIDER_LIST.stream()
+                .filter(inst -> inst.accept(sinkType))
+                .findFirst()
+                .orElseThrow(() -> new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                        String.format(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage(), sinkType)));
+    }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
new file mode 100644
index 0000000000..cc6a545490
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node;
+
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The node factory
+ */
+public class NodeFactory {
+
+    /**
+     * Create extract nodes from the given sources.
+     */
+    public static List<ExtractNode> createExtractNodes(List<StreamSource> sourceInfos) {
+        if (CollectionUtils.isEmpty(sourceInfos)) {
+            return Lists.newArrayList();
+        }
+        return sourceInfos.stream().map(v -> {
+            String sourceType = v.getSourceType();
+            return ExtractNodeProviderFactory.getExtractNodeProvider(sourceType).createExtractNode(v);
+        }).collect(Collectors.toList());
+    }
+
+    /**
+     * Create load nodes from the given sinks.
+     */
+    public static List<LoadNode> createLoadNodes(List<StreamSink> sinkInfos,
+            Map<String, StreamField> constantFieldMap) {
+        if (CollectionUtils.isEmpty(sinkInfos)) {
+            return Lists.newArrayList();
+        }
+        return sinkInfos.stream().map(v -> {
+            String sinkType = v.getSinkType();
+            return LoadNodeProviderFactory.getLoadNodeProvider(sinkType).createLoadNode(v, constantFieldMap);
+        }).collect(Collectors.toList());
+    }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
similarity index 90%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index aeed2df4c8..c5aea94d08 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sort.node;
+package org.apache.inlong.manager.pojo.sort.node.base;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
@@ -43,30 +43,22 @@ import java.util.stream.Collectors;
  */
 public interface ExtractNodeProvider extends NodeProvider {
 
-    /**
-     * Determines whether the current instance matches the specified type.
-     *
-     * @param sourceType the specified source type
-     * @return whether the current instance matches the specified type
-     */
-    Boolean accept(String sourceType);
-
     /**
      * Create extract node by stream node info
      *
      * @param nodeInfo stream node info
      * @return the extract node
      */
-    ExtractNode createNode(StreamNode nodeInfo);
+    ExtractNode createExtractNode(StreamNode nodeInfo);
 
     /**
-     * Parse FieldInfos
+     * Parse StreamFieldInfos
      *
      * @param streamFields The stream fields
      * @param nodeId The node id
      * @return FieldInfo list
      */
-    default List<FieldInfo> parseFieldInfos(List<StreamField> streamFields, String nodeId) {
+    default List<FieldInfo> parseStreamFieldInfos(List<StreamField> streamFields, String nodeId) {
         // Filter constant fields
         return streamFields.stream().filter(s -> Objects.isNull(s.getFieldValue()))
                 .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId))
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
new file mode 100644
index 0000000000..2302c15943
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.base;
+
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.formats.common.StringTypeInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Interface of the load node provider
+ */
+public interface LoadNodeProvider extends NodeProvider {
+
+    /**
+     * Create load node by stream node info
+     *
+     * @param nodeInfo stream node info
+     * @param constantFieldMap the constant field map
+     * @return the load node
+     */
+    LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap);
+
+    /**
+     * Parse FieldInfos
+     *
+     * @param sinkFields The stream sink fields
+     * @param nodeId The node id
+     * @return FieldInfo list
+     */
+    default List<FieldInfo> parseSinkFieldInfos(List<SinkField> sinkFields, String nodeId) {
+        return sinkFields.stream().map(field -> FieldInfoUtils.parseSinkFieldInfo(field, nodeId))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Parse information field of data sink.
+     */
+    default List<FieldRelation> parseSinkFields(List<SinkField> fieldList,
+            Map<String, StreamField> constantFieldMap) {
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return Lists.newArrayList();
+        }
+        return fieldList.stream()
+                .filter(sinkField -> StringUtils.isNotEmpty(sinkField.getSourceFieldName()))
+                .map(field -> {
+                    FieldInfo outputField = new FieldInfo(field.getFieldName(),
+                            FieldInfoUtils.convertFieldFormat(field.getFieldType(), field.getFieldFormat()));
+                    FunctionParam inputField;
+                    String fieldKey = String.format("%s-%s", field.getOriginNodeName(), field.getSourceFieldName());
+                    StreamField constantField = constantFieldMap.get(fieldKey);
+                    if (constantField != null) {
+                        if (outputField.getFormatInfo() != null
+                                && outputField.getFormatInfo().getTypeInfo() == StringTypeInfo.INSTANCE) {
+                            inputField = new StringConstantParam(constantField.getFieldValue());
+                        } else {
+                            inputField = new ConstantParam(constantField.getFieldValue());
+                        }
+                    } else if (FieldType.FUNCTION.name().equalsIgnoreCase(field.getSourceFieldType())) {
+                        inputField = new CustomFunction(field.getSourceFieldName());
+                    } else {
+                        inputField = new FieldInfo(field.getSourceFieldName(), field.getOriginNodeName(),
+                                FieldInfoUtils.convertFieldFormat(field.getSourceFieldType()));
+                    }
+                    return new FieldRelation(inputField, outputField);
+                }).collect(Collectors.toList());
+    }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
similarity index 82%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
index f8f7efd030..a40bd1106d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sort.node;
+package org.apache.inlong.manager.pojo.sort.node.base;
 
 import java.util.Map;
 import java.util.Objects;
@@ -26,6 +26,14 @@ import java.util.stream.Collectors;
  */
 public interface NodeProvider {
 
+    /**
+     * Determines whether the current instance matches the specified type.
+     *
+     * @param streamType the specified type
+     * @return whether the current instance matches the specified type
+     */
+    Boolean accept(String streamType);
+
     /**
      * Parse properties
      *
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java
deleted file mode 100644
index ebbd00e326..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.node.extract;
-
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
-import org.apache.inlong.manager.pojo.source.hudi.HudiSource;
-import org.apache.inlong.manager.pojo.stream.StreamNode;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The Provider for creating Hudi extract nodes.
- */
-public class HudiProvider implements ExtractNodeProvider {
-
-    @Override
-    public Boolean accept(String sourceType) {
-        return SourceType.HUDI.equals(sourceType);
-    }
-
-    @Override
-    public ExtractNode createNode(StreamNode streamNodeInfo) {
-        HudiSource source = (HudiSource) streamNodeInfo;
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
-        Map<String, String> properties = parseProperties(source.getProperties());
-
-        return new HudiExtractNode(
-                source.getSourceName(),
-                source.getSourceName(),
-                fieldInfos,
-                null,
-                source.getCatalogUri(),
-                source.getWarehouse(),
-                source.getDbName(),
-                source.getTableName(),
-                CatalogType.HIVE,
-                source.getCheckIntervalInMinus(),
-                source.isReadStreamingSkipCompaction(),
-                source.getReadStartCommit(),
-                properties,
-                source.getExtList());
-    }
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java
deleted file mode 100644
index 88b63f28e7..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.node.extract;
-
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
-import org.apache.inlong.manager.pojo.stream.StreamNode;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
-import org.apache.inlong.sort.protocol.node.format.Format;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The Provider for creating Kafka extract nodes.
- */
-public class KafkaProvider implements ExtractNodeProvider {
-
-    @Override
-    public Boolean accept(String sourceType) {
-        return SourceType.KAFKA.equals(sourceType);
-    }
-
-    @Override
-    public ExtractNode createNode(StreamNode streamNodeInfo) {
-        KafkaSource kafkaSource = (KafkaSource) streamNodeInfo;
-        List<FieldInfo> fieldInfos = parseFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
-        Map<String, String> properties = parseProperties(kafkaSource.getProperties());
-
-        String topic = kafkaSource.getTopic();
-        String bootstrapServers = kafkaSource.getBootstrapServers();
-
-        Format format = parsingFormat(
-                kafkaSource.getSerializationType(),
-                kafkaSource.isWrapWithInlongMsg(),
-                kafkaSource.getDataSeparator(),
-                kafkaSource.isIgnoreParseErrors());
-
-        KafkaOffset kafkaOffset = KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
-        KafkaScanStartupMode startupMode;
-        switch (kafkaOffset) {
-            case EARLIEST:
-                startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
-                break;
-            case SPECIFIC:
-                startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
-                break;
-            case TIMESTAMP_MILLIS:
-                startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
-                break;
-            case LATEST:
-            default:
-                startupMode = KafkaScanStartupMode.LATEST_OFFSET;
-        }
-        final String primaryKey = kafkaSource.getPrimaryKey();
-        String groupId = kafkaSource.getGroupId();
-        String partitionOffset = kafkaSource.getPartitionOffsets();
-        String scanTimestampMillis = kafkaSource.getTimestampMillis();
-        return new KafkaExtractNode(kafkaSource.getSourceName(),
-                kafkaSource.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                topic,
-                bootstrapServers,
-                format,
-                startupMode,
-                primaryKey,
-                groupId,
-                partitionOffset,
-                scanTimestampMillis);
-    }
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java
deleted file mode 100644
index 18f435e89f..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.node.extract;
-
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
-import org.apache.inlong.manager.pojo.source.redis.RedisLookupOptions;
-import org.apache.inlong.manager.pojo.source.redis.RedisSource;
-import org.apache.inlong.manager.pojo.stream.StreamNode;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.LookupOptions;
-import org.apache.inlong.sort.protocol.enums.RedisCommand;
-import org.apache.inlong.sort.protocol.enums.RedisMode;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The Provider for creating Redis extract nodes.
- */
-public class RedisProvider implements ExtractNodeProvider {
-
-    @Override
-    public Boolean accept(String sourceType) {
-        return SourceType.REDIS.equals(sourceType);
-    }
-
-    @Override
-    public ExtractNode createNode(StreamNode streamNodeInfo) {
-        RedisSource source = (RedisSource) streamNodeInfo;
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
-        Map<String, String> properties = parseProperties(source.getProperties());
-
-        RedisMode redisMode = RedisMode.forName(source.getRedisMode());
-        switch (redisMode) {
-            case STANDALONE:
-                return new RedisExtractNode(
-                        source.getSourceName(),
-                        source.getSourceName(),
-                        fieldInfos,
-                        null,
-                        properties,
-                        source.getPrimaryKey(),
-                        RedisCommand.forName(source.getCommand()),
-                        source.getHost(),
-                        source.getPort(),
-                        source.getPassword(),
-                        source.getAdditionalKey(),
-                        source.getDatabase(),
-                        source.getTimeout(),
-                        source.getSoTimeout(),
-                        source.getMaxTotal(),
-                        source.getMaxIdle(),
-                        source.getMinIdle(),
-                        parseLookupOptions(source.getLookupOptions()));
-            case SENTINEL:
-                return new RedisExtractNode(
-                        source.getSourceName(),
-                        source.getSourceName(),
-                        fieldInfos,
-                        null,
-                        properties,
-                        source.getPrimaryKey(),
-                        RedisCommand.forName(source.getCommand()),
-                        source.getMasterName(),
-                        source.getSentinelsInfo(),
-                        source.getPassword(),
-                        source.getAdditionalKey(),
-                        source.getDatabase(),
-                        source.getTimeout(),
-                        source.getSoTimeout(),
-                        source.getMaxTotal(),
-                        source.getMaxIdle(),
-                        source.getMinIdle(),
-                        parseLookupOptions(source.getLookupOptions()));
-            case CLUSTER:
-                return new RedisExtractNode(
-                        source.getSourceName(),
-                        source.getSourceName(),
-                        fieldInfos,
-                        null,
-                        properties,
-                        source.getPrimaryKey(),
-                        RedisCommand.forName(source.getCommand()),
-                        source.getClusterNodes(),
-                        source.getPassword(),
-                        source.getAdditionalKey(),
-                        source.getDatabase(),
-                        source.getTimeout(),
-                        source.getSoTimeout(),
-                        source.getMaxTotal(),
-                        source.getMaxIdle(),
-                        source.getMinIdle(),
-                        parseLookupOptions(source.getLookupOptions()));
-            default:
-                throw new IllegalArgumentException(String.format("Unsupported redis-mode=%s for Inlong", redisMode));
-        }
-    }
-
-    /**
-     * Parse LookupOptions
-     *
-     * @param options RedisLookupOptions
-     * @return LookupOptions
-     */
-    private static LookupOptions parseLookupOptions(RedisLookupOptions options) {
-        if (options == null) {
-            return null;
-        }
-        return new LookupOptions(options.getLookupCacheMaxRows(), options.getLookupCacheTtl(),
-                options.getLookupMaxRetries(), options.getLookupAsync());
-    }
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java
deleted file mode 100644
index f9ca5f985a..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.node.extract;
-
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
-import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
-import org.apache.inlong.manager.pojo.stream.StreamNode;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The Provider for creating SQLServer extract nodes.
- */
-public class SqlServerProvider implements ExtractNodeProvider {
-
-    @Override
-    public Boolean accept(String sourceType) {
-        return SourceType.SQLSERVER.equals(sourceType);
-    }
-
-    @Override
-    public ExtractNode createNode(StreamNode streamNodeInfo) {
-        SQLServerSource source = (SQLServerSource) streamNodeInfo;
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
-        Map<String, String> properties = parseProperties(source.getProperties());
-
-        return new SqlServerExtractNode(
-                source.getSourceName(),
-                source.getSourceName(),
-                fieldInfos,
-                null,
-                properties,
-                source.getPrimaryKey(),
-                source.getHostname(),
-                source.getPort(),
-                source.getUsername(),
-                source.getPassword(),
-                source.getDatabase(),
-                source.getSchemaName(),
-                source.getTableName(),
-                source.getServerTimezone());
-    }
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ClickHouseProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ClickHouseProvider.java
new file mode 100644
index 0000000000..3838633957
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ClickHouseProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating ClickHouse load nodes.
+ */
+public class ClickHouseProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.CLICKHOUSE.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        ClickHouseSink streamSink = (ClickHouseSink) nodeInfo;
+        Map<String, String> properties = parseProperties(streamSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(streamSink.getSinkFieldList(), streamSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(), constantFieldMap);
+        return new ClickHouseLoadNode(
+                streamSink.getSinkName(),
+                streamSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                streamSink.getDbName() + "." + streamSink.getTableName(),
+                streamSink.getJdbcUrl() + "/" + streamSink.getDbName(),
+                streamSink.getUsername(),
+                streamSink.getPassword(),
+                streamSink.getPrimaryKey());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
new file mode 100644
index 0000000000..2586895f8a
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Doris load nodes.
+ */
+public class DorisProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.DORIS.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        DorisSink dorisSink = (DorisSink) nodeInfo;
+        Map<String, String> properties = parseProperties(dorisSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(dorisSink.getSinkFieldList(), dorisSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(dorisSink.getSinkFieldList(), constantFieldMap);
+        Format format = null;
+        if (dorisSink.getSinkMultipleEnable() != null && dorisSink.getSinkMultipleEnable() && StringUtils.isNotBlank(
+                dorisSink.getSinkMultipleFormat())) {
+            DataTypeEnum dataType = DataTypeEnum.forType(dorisSink.getSinkMultipleFormat());
+            switch (dataType) {
+                case CANAL:
+                    format = new CanalJsonFormat();
+                    break;
+                case DEBEZIUM_JSON:
+                    format = new DebeziumJsonFormat();
+                    break;
+                default:
+                    throw new IllegalArgumentException(String.format("Unsupported dataType=%s for doris", dataType));
+            }
+        }
+        return new DorisLoadNode(
+                dorisSink.getSinkName(),
+                dorisSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                dorisSink.getFeNodes(),
+                dorisSink.getUsername(),
+                dorisSink.getPassword(),
+                dorisSink.getTableIdentifier(),
+                null,
+                dorisSink.getSinkMultipleEnable(),
+                format,
+                dorisSink.getDatabasePattern(),
+                dorisSink.getTablePattern());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
new file mode 100644
index 0000000000..5072f9ac46
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Elasticsearch load nodes.
+ */
+public class ElasticsearchProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.ELASTICSEARCH.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        ElasticsearchSink elasticsearchSink = (ElasticsearchSink) nodeInfo;
+        Map<String, String> properties = parseProperties(elasticsearchSink.getProperties());
+        List<SinkField> sinkFieldList = elasticsearchSink.getSinkFieldList();
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(sinkFieldList, elasticsearchSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(sinkFieldList, constantFieldMap);
+        Format format = null;
+        if (elasticsearchSink.getSinkMultipleEnable() != null && elasticsearchSink.getSinkMultipleEnable()
+                && StringUtils.isNotBlank(
+                        elasticsearchSink.getSinkMultipleFormat())) {
+            DataTypeEnum dataType = DataTypeEnum.forType(elasticsearchSink.getSinkMultipleFormat());
+            switch (dataType) {
+                case CANAL:
+                    format = new CanalJsonFormat();
+                    break;
+                case DEBEZIUM_JSON:
+                    format = new DebeziumJsonFormat();
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format("Unsupported dataType=%s for elasticsearch", dataType));
+            }
+        }
+        return new ElasticsearchLoadNode(
+                elasticsearchSink.getSinkName(),
+                elasticsearchSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                elasticsearchSink.getIndexName(),
+                elasticsearchSink.getHosts(),
+                elasticsearchSink.getUsername(),
+                elasticsearchSink.getPassword(),
+                elasticsearchSink.getDocumentType(),
+                elasticsearchSink.getPrimaryKey(),
+                elasticsearchSink.getEsVersion(),
+                elasticsearchSink.getSinkMultipleEnable(),
+                format,
+                elasticsearchSink.getIndexPattern());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/GreenplumProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/GreenplumProvider.java
new file mode 100644
index 0000000000..df5618bce0
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/GreenplumProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Greenplum load nodes.
+ */
+public class GreenplumProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.GREENPLUM.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        GreenplumSink greenplumSink = (GreenplumSink) nodeInfo;
+        Map<String, String> properties = parseProperties(greenplumSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(greenplumSink.getSinkFieldList(), greenplumSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(greenplumSink.getSinkFieldList(), constantFieldMap);
+        return new GreenplumLoadNode(
+                greenplumSink.getSinkName(),
+                greenplumSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                greenplumSink.getJdbcUrl(),
+                greenplumSink.getUsername(),
+                greenplumSink.getPassword(),
+                greenplumSink.getTableName(),
+                greenplumSink.getPrimaryKey());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HBaseProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HBaseProvider.java
new file mode 100644
index 0000000000..b00ae617cf
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HBaseProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.hbase.HBaseSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating HBase load nodes.
+ */
+public class HBaseProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.HBASE.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        HBaseSink hbaseSink = (HBaseSink) nodeInfo;
+        Map<String, String> properties = parseProperties(hbaseSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(hbaseSink.getSinkFieldList(), hbaseSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(hbaseSink.getSinkFieldList(), constantFieldMap);
+        return new HbaseLoadNode(
+                hbaseSink.getSinkName(),
+                hbaseSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                Lists.newArrayList(),
+                null,
+                null,
+                properties,
+                hbaseSink.getTableName(),
+                hbaseSink.getNamespace(),
+                hbaseSink.getZkQuorum(),
+                hbaseSink.getRowKey(),
+                hbaseSink.getBufferFlushMaxSize(),
+                hbaseSink.getZkNodeParent(),
+                hbaseSink.getBufferFlushMaxRows(),
+                hbaseSink.getBufferFlushInterval());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HDFSProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HDFSProvider.java
new file mode 100644
index 0000000000..94d5415043
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HDFSProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The Provider for creating HDFS load nodes.
+ */
+public class HDFSProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.HDFS.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        HDFSSink hdfsSink = (HDFSSink) nodeInfo;
+        Map<String, String> properties = parseProperties(hdfsSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(hdfsSink.getSinkFieldList(), hdfsSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(hdfsSink.getSinkFieldList(), constantFieldMap);
+        List<FieldInfo> partitionFields = Lists.newArrayList();
+        if (CollectionUtils.isNotEmpty(hdfsSink.getPartitionFieldList())) {
+            partitionFields = hdfsSink.getPartitionFieldList().stream()
+                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hdfsSink.getSinkName(),
+                            FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
+                                    partitionField.getFieldFormat())))
+                    .collect(Collectors.toList());
+        }
+
+        return new FileSystemLoadNode(
+                hdfsSink.getSinkName(),
+                hdfsSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                Lists.newArrayList(),
+                hdfsSink.getDataPath(),
+                hdfsSink.getFileFormat(),
+                null,
+                properties,
+                partitionFields,
+                hdfsSink.getServerTimeZone());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HiveProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HiveProvider.java
new file mode 100644
index 0000000000..6595409def
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HiveProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The Provider for creating Hive load nodes.
+ */
+public class HiveProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.HIVE.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        HiveSink hiveSink = (HiveSink) nodeInfo;
+        Map<String, String> properties = parseProperties(hiveSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(hiveSink.getSinkFieldList(), hiveSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(hiveSink.getSinkFieldList(), constantFieldMap);
+        List<FieldInfo> partitionFields = Lists.newArrayList();
+        if (CollectionUtils.isNotEmpty(hiveSink.getPartitionFieldList())) {
+            partitionFields = hiveSink.getPartitionFieldList().stream()
+                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hiveSink.getSinkName(),
+                            FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
+                                    partitionField.getFieldFormat())))
+                    .collect(Collectors.toList());
+        }
+        return new HiveLoadNode(
+                hiveSink.getSinkName(),
+                hiveSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                Lists.newArrayList(),
+                null,
+                null,
+                properties,
+                null,
+                hiveSink.getDbName(),
+                hiveSink.getTableName(),
+                hiveSink.getHiveConfDir(),
+                hiveSink.getHiveVersion(),
+                null,
+                partitionFields);
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HudiProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HudiProvider.java
new file mode 100644
index 0000000000..24dd749e41
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HudiProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.source.hudi.HudiSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.HudiConstant;
+import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
+import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Hudi extract or load nodes.
+ */
+public class HudiProvider implements ExtractNodeProvider, LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String streamType) {
+        return StreamType.HUDI.equals(streamType);
+    }
+
+    @Override
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
+        HudiSource source = (HudiSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
+        Map<String, String> properties = parseProperties(source.getProperties());
+
+        return new HudiExtractNode(
+                source.getSourceName(),
+                source.getSourceName(),
+                fieldInfos,
+                null,
+                source.getCatalogUri(),
+                source.getWarehouse(),
+                source.getDbName(),
+                source.getTableName(),
+                CatalogType.HIVE,
+                source.getCheckIntervalInMinus(),
+                source.isReadStreamingSkipCompaction(),
+                source.getReadStartCommit(),
+                properties,
+                source.getExtList());
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        HudiSink hudiSink = (HudiSink) nodeInfo;
+        Map<String, String> properties = parseProperties(hudiSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(hudiSink.getSinkFieldList(), hudiSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(hudiSink.getSinkFieldList(), constantFieldMap);
+        HudiConstant.CatalogType catalogType = HudiConstant.CatalogType.forName(hudiSink.getCatalogType());
+
+        return new HudiLoadNode(
+                hudiSink.getSinkName(),
+                hudiSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                hudiSink.getDbName(),
+                hudiSink.getTableName(),
+                hudiSink.getPrimaryKey(),
+                catalogType,
+                hudiSink.getCatalogUri(),
+                hudiSink.getWarehouse(),
+                hudiSink.getExtList(),
+                hudiSink.getPartitionKey());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
new file mode 100644
index 0000000000..1df0c0f567
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Iceberg load nodes.
+ */
+public class IcebergProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.ICEBERG.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        IcebergSink icebergSink = (IcebergSink) nodeInfo;
+        Map<String, String> properties = parseProperties(icebergSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(icebergSink.getSinkFieldList(), icebergSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(icebergSink.getSinkFieldList(), constantFieldMap);
+        IcebergConstant.CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType());
+
+        return new IcebergLoadNode(
+                icebergSink.getSinkName(),
+                icebergSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                icebergSink.getDbName(),
+                icebergSink.getTableName(),
+                icebergSink.getPrimaryKey(),
+                catalogType,
+                icebergSink.getCatalogUri(),
+                icebergSink.getWarehouse());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
new file mode 100644
index 0000000000..adb1d911e1
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
+import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.format.AvroFormat;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.format.RawFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Kafka extract or load nodes.
+ */
+public class KafkaProvider implements ExtractNodeProvider, LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String streamType) {
+        return StreamType.KAFKA.equals(streamType);
+    }
+
+    @Override
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
+        KafkaSource kafkaSource = (KafkaSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = parseStreamFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
+        Map<String, String> properties = parseProperties(kafkaSource.getProperties());
+
+        String topic = kafkaSource.getTopic();
+        String bootstrapServers = kafkaSource.getBootstrapServers();
+
+        Format format = parsingFormat(
+                kafkaSource.getSerializationType(),
+                kafkaSource.isWrapWithInlongMsg(),
+                kafkaSource.getDataSeparator(),
+                kafkaSource.isIgnoreParseErrors());
+
+        KafkaOffset kafkaOffset = KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
+        KafkaScanStartupMode startupMode;
+        switch (kafkaOffset) {
+            case EARLIEST:
+                startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
+                break;
+            case SPECIFIC:
+                startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
+                break;
+            case TIMESTAMP_MILLIS:
+                startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
+                break;
+            case LATEST:
+            default:
+                startupMode = KafkaScanStartupMode.LATEST_OFFSET;
+        }
+        final String primaryKey = kafkaSource.getPrimaryKey();
+        String groupId = kafkaSource.getGroupId();
+        String partitionOffset = kafkaSource.getPartitionOffsets();
+        String scanTimestampMillis = kafkaSource.getTimestampMillis();
+        return new KafkaExtractNode(kafkaSource.getSourceName(),
+                kafkaSource.getSourceName(),
+                fieldInfos,
+                null,
+                properties,
+                topic,
+                bootstrapServers,
+                format,
+                startupMode,
+                primaryKey,
+                groupId,
+                partitionOffset,
+                scanTimestampMillis);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        KafkaSink kafkaSink = (KafkaSink) nodeInfo;
+        Map<String, String> properties = parseProperties(kafkaSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(kafkaSink.getSinkFieldList(), kafkaSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(kafkaSink.getSinkFieldList(), constantFieldMap);
+        Integer sinkParallelism = null;
+        if (StringUtils.isNotEmpty(kafkaSink.getPartitionNum())) {
+            sinkParallelism = Integer.parseInt(kafkaSink.getPartitionNum());
+        }
+        DataTypeEnum dataType = DataTypeEnum.forType(kafkaSink.getSerializationType());
+        Format format;
+        switch (dataType) {
+            case CSV:
+                format = new CsvFormat();
+                break;
+            case AVRO:
+                format = new AvroFormat();
+                break;
+            case JSON:
+                format = new JsonFormat();
+                break;
+            case CANAL:
+                format = new CanalJsonFormat();
+                break;
+            case DEBEZIUM_JSON:
+                format = new DebeziumJsonFormat();
+                break;
+            case RAW:
+                format = new RawFormat();
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
+        }
+
+        return new KafkaLoadNode(
+                kafkaSink.getSinkName(),
+                kafkaSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                Lists.newArrayList(),
+                null,
+                kafkaSink.getTopicName(),
+                kafkaSink.getBootstrapServers(),
+                format,
+                sinkParallelism,
+                properties,
+                kafkaSink.getPrimaryKey());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KuduProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KuduProvider.java
new file mode 100644
index 0000000000..3ddd3e5e40
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KuduProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.KuduLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Kudu load nodes.
+ */
+public class KuduProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.KUDU.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        KuduSink kuduSink = (KuduSink) nodeInfo;
+        Map<String, String> properties = parseProperties(kuduSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(kuduSink.getSinkFieldList(), kuduSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(kuduSink.getSinkFieldList(), constantFieldMap);
+
+        return new KuduLoadNode(
+                kuduSink.getSinkName(),
+                kuduSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                kuduSink.getMasters(),
+                kuduSink.getTableName(),
+                kuduSink.getPartitionKey());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MongoProvider.java
similarity index 86%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MongoProvider.java
index 72013e2386..2b87c75221 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MongoProvider.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
 
 import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.source.mongodb.MongoDBSource;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
@@ -39,9 +39,9 @@ public class MongoProvider implements ExtractNodeProvider {
     }
 
     @Override
-    public ExtractNode createNode(StreamNode streamNodeInfo) {
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
         MongoDBSource source = (MongoDBSource) streamNodeInfo;
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+        List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
         Map<String, String> properties = parseProperties(source.getProperties());
 
         return new MongoExtractNode(
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
similarity index 87%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
index 9732a8d277..a95fc99008 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
 
 import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
@@ -31,9 +31,9 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * The Provider for creating MysqlBinlog extract nodes.
+ * The Provider for creating MySQLBinlog extract nodes.
  */
-public class MysqlBinlogProvider implements ExtractNodeProvider {
+public class MySQLBinlogProvider implements ExtractNodeProvider {
 
     @Override
     public Boolean accept(String sourceType) {
@@ -41,9 +41,9 @@ public class MysqlBinlogProvider implements ExtractNodeProvider {
     }
 
     @Override
-    public ExtractNode createNode(StreamNode streamNodeInfo) {
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
         MySQLBinlogSource binlogSource = (MySQLBinlogSource) streamNodeInfo;
-        List<FieldInfo> fieldInfos = parseFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
+        List<FieldInfo> fieldInfos = parseStreamFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
         Map<String, String> properties = parseProperties(binlogSource.getProperties());
 
         final String database = binlogSource.getDatabaseWhiteList();
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLProvider.java
new file mode 100644
index 0000000000..8a46d4bacb
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
+import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating MySQL load nodes.
+ */
+public class MySQLProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.MYSQL.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        MySQLSink mysqlSink = (MySQLSink) nodeInfo;
+        Map<String, String> properties = parseProperties(mysqlSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(mysqlSink.getSinkFieldList(), mysqlSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(mysqlSink.getSinkFieldList(), constantFieldMap);
+
+        return new MySqlLoadNode(
+                mysqlSink.getSinkName(),
+                mysqlSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                Lists.newArrayList(),
+                null,
+                null,
+                properties,
+                MySQLSinkDTO.setDbNameToUrl(mysqlSink.getJdbcUrl(), mysqlSink.getDatabaseName()),
+                mysqlSink.getUsername(),
+                mysqlSink.getPassword(),
+                mysqlSink.getTableName(),
+                mysqlSink.getPrimaryKey());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OracleProvider.java
similarity index 52%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OracleProvider.java
index 72993b723a..12643e0370 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OracleProvider.java
@@ -15,16 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
 
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
 import org.apache.inlong.manager.pojo.source.oracle.OracleSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
+import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -32,19 +38,19 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * The Provider for creating Oracle extract nodes.
+ * The Provider for creating Oracle extract or load nodes.
  */
-public class OracleProvider implements ExtractNodeProvider {
+public class OracleProvider implements ExtractNodeProvider, LoadNodeProvider {
 
     @Override
-    public Boolean accept(String sourceType) {
-        return SourceType.ORACLE.equals(sourceType);
+    public Boolean accept(String streamType) {
+        return StreamType.ORACLE.equals(streamType);
     }
 
     @Override
-    public ExtractNode createNode(StreamNode streamNodeInfo) {
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
         OracleSource source = (OracleSource) streamNodeInfo;
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+        List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
         Map<String, String> properties = parseProperties(source.getProperties());
 
         ScanStartUpMode scanStartupMode = StringUtils.isBlank(source.getScanStartupMode())
@@ -66,4 +72,27 @@ public class OracleProvider implements ExtractNodeProvider {
                 source.getPort(),
                 scanStartupMode);
     }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        OracleSink oracleSink = (OracleSink) nodeInfo;
+        Map<String, String> properties = parseProperties(oracleSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(oracleSink.getSinkFieldList(), oracleSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(oracleSink.getSinkFieldList(), constantFieldMap);
+
+        return new OracleLoadNode(
+                oracleSink.getSinkName(),
+                oracleSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                oracleSink.getJdbcUrl(),
+                oracleSink.getUsername(),
+                oracleSink.getPassword(),
+                oracleSink.getTableName(),
+                oracleSink.getPrimaryKey());
+    }
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
similarity index 50%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
index 128c30018e..0f16379d4b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
@@ -15,33 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
 
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
 import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
 
 import java.util.List;
 import java.util.Map;
 
 /**
- * The Provider for creating PostgreSql extract nodes.
+ * The Provider for creating PostgreSQL extract or load nodes.
  */
-public class PostgreSqlProvider implements ExtractNodeProvider {
+public class PostgreSQLProvider implements ExtractNodeProvider, LoadNodeProvider {
 
     @Override
-    public Boolean accept(String sourceType) {
-        return SourceType.POSTGRESQL.equals(sourceType);
+    public Boolean accept(String streamType) {
+        return StreamType.POSTGRESQL.equals(streamType);
     }
 
     @Override
-    public ExtractNode createNode(StreamNode streamNodeInfo) {
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
         PostgreSQLSource postgreSQLSource = (PostgreSQLSource) streamNodeInfo;
-        List<FieldInfo> fieldInfos = parseFieldInfos(postgreSQLSource.getFieldList(), postgreSQLSource.getSourceName());
+        List<FieldInfo> fieldInfos = parseStreamFieldInfos(postgreSQLSource.getFieldList(),
+                postgreSQLSource.getSourceName());
         Map<String, String> properties = parseProperties(postgreSQLSource.getProperties());
 
         return new PostgresExtractNode(postgreSQLSource.getSourceName(),
@@ -61,4 +68,28 @@ public class PostgreSqlProvider implements ExtractNodeProvider {
                 postgreSQLSource.getServerTimeZone(),
                 postgreSQLSource.getScanStartupMode());
     }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        PostgreSQLSink postgreSQLSink = (PostgreSQLSink) nodeInfo;
+        Map<String, String> properties = parseProperties(postgreSQLSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(postgreSQLSink.getSinkFieldList(),
+                postgreSQLSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(postgreSQLSink.getSinkFieldList(), constantFieldMap);
+
+        return new PostgresLoadNode(
+                postgreSQLSink.getSinkName(),
+                postgreSQLSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                postgreSQLSink.getJdbcUrl(),
+                postgreSQLSink.getUsername(),
+                postgreSQLSink.getPassword(),
+                postgreSQLSink.getDbName() + "." + postgreSQLSink.getTableName(),
+                postgreSQLSink.getPrimaryKey());
+    }
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
similarity index 90%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index 14cde851b7..da20e8b203 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
 
 import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
@@ -43,9 +43,9 @@ public class PulsarProvider implements ExtractNodeProvider {
     }
 
     @Override
-    public ExtractNode createNode(StreamNode streamNodeInfo) {
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
         PulsarSource pulsarSource = (PulsarSource) streamNodeInfo;
-        List<FieldInfo> fieldInfos = parseFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
+        List<FieldInfo> fieldInfos = parseStreamFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
         Map<String, String> properties = parseProperties(pulsarSource.getProperties());
 
         String fullTopicName =
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/RedisProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/RedisProvider.java
new file mode 100644
index 0000000000..8bc083007a
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/RedisProvider.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.source.redis.RedisLookupOptions;
+import org.apache.inlong.manager.pojo.source.redis.RedisSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.LookupOptions;
+import org.apache.inlong.sort.protocol.enums.RedisCommand;
+import org.apache.inlong.sort.protocol.enums.RedisMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode;
+import org.apache.inlong.sort.protocol.node.format.AvroFormat;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.format.RawFormat;
+import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Redis extract or load nodes.
+ */
+public class RedisProvider implements ExtractNodeProvider, LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.REDIS.equals(sourceType);
+    }
+
+    @Override
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
+        RedisSource source = (RedisSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
+        Map<String, String> properties = parseProperties(source.getProperties());
+
+        RedisMode redisMode = RedisMode.forName(source.getRedisMode());
+        switch (redisMode) {
+            case STANDALONE:
+                return new RedisExtractNode(
+                        source.getSourceName(),
+                        source.getSourceName(),
+                        fieldInfos,
+                        null,
+                        properties,
+                        source.getPrimaryKey(),
+                        RedisCommand.forName(source.getCommand()),
+                        source.getHost(),
+                        source.getPort(),
+                        source.getPassword(),
+                        source.getAdditionalKey(),
+                        source.getDatabase(),
+                        source.getTimeout(),
+                        source.getSoTimeout(),
+                        source.getMaxTotal(),
+                        source.getMaxIdle(),
+                        source.getMinIdle(),
+                        parseLookupOptions(source.getLookupOptions()));
+            case SENTINEL:
+                return new RedisExtractNode(
+                        source.getSourceName(),
+                        source.getSourceName(),
+                        fieldInfos,
+                        null,
+                        properties,
+                        source.getPrimaryKey(),
+                        RedisCommand.forName(source.getCommand()),
+                        source.getMasterName(),
+                        source.getSentinelsInfo(),
+                        source.getPassword(),
+                        source.getAdditionalKey(),
+                        source.getDatabase(),
+                        source.getTimeout(),
+                        source.getSoTimeout(),
+                        source.getMaxTotal(),
+                        source.getMaxIdle(),
+                        source.getMinIdle(),
+                        parseLookupOptions(source.getLookupOptions()));
+            case CLUSTER:
+                return new RedisExtractNode(
+                        source.getSourceName(),
+                        source.getSourceName(),
+                        fieldInfos,
+                        null,
+                        properties,
+                        source.getPrimaryKey(),
+                        RedisCommand.forName(source.getCommand()),
+                        source.getClusterNodes(),
+                        source.getPassword(),
+                        source.getAdditionalKey(),
+                        source.getDatabase(),
+                        source.getTimeout(),
+                        source.getSoTimeout(),
+                        source.getMaxTotal(),
+                        source.getMaxIdle(),
+                        source.getMinIdle(),
+                        parseLookupOptions(source.getLookupOptions()));
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported redis-mode=%s for Inlong", redisMode));
+        }
+    }
+
+    /**
+     * Parse LookupOptions
+     *
+     * @param options RedisLookupOptions
+     * @return LookupOptions
+     */
+    private static LookupOptions parseLookupOptions(RedisLookupOptions options) {
+        if (options == null) {
+            return null;
+        }
+        return new LookupOptions(options.getLookupCacheMaxRows(), options.getLookupCacheTtl(),
+                options.getLookupMaxRetries(), options.getLookupAsync());
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        RedisSink redisSink = (RedisSink) nodeInfo;
+        Map<String, String> properties = parseProperties(redisSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(redisSink.getSinkFieldList(), redisSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(redisSink.getSinkFieldList(), constantFieldMap);
+
+        String clusterMode = redisSink.getClusterMode();
+        String dataType = redisSink.getDataType();
+        String schemaMapMode = redisSink.getSchemaMapMode();
+        String host = redisSink.getHost();
+        Integer port = redisSink.getPort();
+        String clusterNodes = redisSink.getClusterNodes();
+        String masterName = redisSink.getMasterName();
+        String sentinelsInfo = redisSink.getSentinelsInfo();
+        Integer database = redisSink.getDatabase();
+        String password = redisSink.getPassword();
+        Integer ttl = redisSink.getTtl();
+        Integer timeout = redisSink.getTimeout();
+        Integer soTimeout = redisSink.getSoTimeout();
+        Integer maxTotal = redisSink.getMaxTotal();
+        Integer maxIdle = redisSink.getMaxIdle();
+        Integer minIdle = redisSink.getMinIdle();
+        Integer maxRetries = redisSink.getMaxRetries();
+
+        Format format = parsingDataFormat(
+                redisSink.getFormatDataType(),
+                false,
+                redisSink.getFormatDataSeparator(),
+                false);
+
+        return new RedisLoadNode(
+                redisSink.getSinkName(),
+                redisSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                clusterMode,
+                dataType,
+                schemaMapMode,
+                host,
+                port,
+                clusterNodes,
+                masterName,
+                sentinelsInfo,
+                database,
+                password,
+                ttl,
+                format,
+                timeout,
+                soTimeout,
+                maxTotal,
+                maxIdle,
+                minIdle,
+                maxRetries);
+    }
+
+    /**
+     * Parse format
+     *
+     * @param formatName data serialization, support: csv, json, canal, avro, etc
+     * @param wrapWithInlongMsg whether wrap content with {@link InLongMsgFormat}
+     * @param separatorStr the separator of data content
+     * @param ignoreParseErrors whether ignore deserialization error data
+     * @return the format for serialized content
+     */
+    private Format parsingDataFormat(
+            String formatName,
+            boolean wrapWithInlongMsg,
+            String separatorStr,
+            boolean ignoreParseErrors) {
+        Format format;
+        DataTypeEnum dataType = DataTypeEnum.forType(formatName);
+        switch (dataType) {
+            case CSV:
+                if (StringUtils.isNumeric(separatorStr)) {
+                    char dataSeparator = (char) Integer.parseInt(separatorStr);
+                    separatorStr = Character.toString(dataSeparator);
+                }
+                CsvFormat csvFormat = new CsvFormat(separatorStr);
+                csvFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = csvFormat;
+                break;
+            case AVRO:
+                format = new AvroFormat();
+                break;
+            case JSON:
+                JsonFormat jsonFormat = new JsonFormat();
+                jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = jsonFormat;
+                break;
+            case CANAL:
+                format = new CanalJsonFormat();
+                break;
+            case DEBEZIUM_JSON:
+                DebeziumJsonFormat debeziumJsonFormat = new DebeziumJsonFormat();
+                debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+                format = debeziumJsonFormat;
+                break;
+            case RAW:
+                format = new RawFormat();
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unsupported dataType=%s", dataType));
+        }
+        if (wrapWithInlongMsg) {
+            Format innerFormat = format;
+            format = new InLongMsgFormat(innerFormat, false);
+        }
+        return format;
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/SQLServerProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/SQLServerProvider.java
new file mode 100644
index 0000000000..27aa06c92d
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/SQLServerProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
+import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating SQLServer extract or load nodes.
+ */
+public class SQLServerProvider implements ExtractNodeProvider, LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String streamType) {
+        return StreamType.SQLSERVER.equals(streamType);
+    }
+
+    @Override
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
+        SQLServerSource source = (SQLServerSource) streamNodeInfo;
+        List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
+        Map<String, String> properties = parseProperties(source.getProperties());
+
+        return new SqlServerExtractNode(
+                source.getSourceName(),
+                source.getSourceName(),
+                fieldInfos,
+                null,
+                properties,
+                source.getPrimaryKey(),
+                source.getHostname(),
+                source.getPort(),
+                source.getUsername(),
+                source.getPassword(),
+                source.getDatabase(),
+                source.getSchemaName(),
+                source.getTableName(),
+                source.getServerTimezone());
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        SQLServerSink sqlServerSink = (SQLServerSink) nodeInfo;
+        Map<String, String> properties = parseProperties(sqlServerSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(sqlServerSink.getSinkFieldList(), sqlServerSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(sqlServerSink.getSinkFieldList(), constantFieldMap);
+
+        return new SqlServerLoadNode(
+                sqlServerSink.getSinkName(),
+                sqlServerSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                sqlServerSink.getJdbcUrl(),
+                sqlServerSink.getUsername(),
+                sqlServerSink.getPassword(),
+                sqlServerSink.getSchemaName(),
+                sqlServerSink.getTableName(),
+                sqlServerSink.getPrimaryKey());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
new file mode 100644
index 0000000000..588671d370
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating StarRocks load nodes.
+ */
+public class StarRocksProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.STARROCKS.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        StarRocksSink starRocksSink = (StarRocksSink) nodeInfo;
+        Map<String, String> properties = parseProperties(starRocksSink.getProperties());
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(starRocksSink.getSinkFieldList(), starRocksSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(starRocksSink.getSinkFieldList(), constantFieldMap);
+
+        Format format = null;
+        if (Boolean.TRUE.equals(starRocksSink.getSinkMultipleEnable())
+                && StringUtils.isNotBlank(starRocksSink.getSinkMultipleFormat())) {
+            DataTypeEnum dataType = DataTypeEnum.forType(starRocksSink.getSinkMultipleFormat());
+            switch (dataType) {
+                case CANAL:
+                    format = new CanalJsonFormat();
+                    break;
+                case DEBEZIUM_JSON:
+                    format = new DebeziumJsonFormat();
+                    break;
+                default:
+                    throw new IllegalArgumentException(
+                            String.format("Unsupported dataType=%s for StarRocks", dataType));
+            }
+        }
+        return new StarRocksLoadNode(
+                starRocksSink.getSinkName(),
+                starRocksSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                starRocksSink.getJdbcUrl(),
+                starRocksSink.getLoadUrl(),
+                starRocksSink.getUsername(),
+                starRocksSink.getPassword(),
+                starRocksSink.getDatabaseName(),
+                starRocksSink.getTableName(),
+                starRocksSink.getPrimaryKey(),
+                starRocksSink.getSinkMultipleEnable(),
+                format,
+                starRocksSink.getDatabasePattern(),
+                starRocksSink.getTablePattern());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TDSQLPostgreSQLProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TDSQLPostgreSQLProvider.java
new file mode 100644
index 0000000000..0f9f797199
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TDSQLPostgreSQLProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating TDSQLPostgreSQL load nodes.
+ */
+public class TDSQLPostgreSQLProvider implements LoadNodeProvider {
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.TDSQLPOSTGRESQL.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+        TDSQLPostgreSQLSink tdsqlPostgreSQLSink = (TDSQLPostgreSQLSink) nodeInfo;
+        Map<String, String> properties = parseProperties(tdsqlPostgreSQLSink.getProperties());
+        List<SinkField> sinkFieldList = tdsqlPostgreSQLSink.getSinkFieldList();
+        List<FieldInfo> fieldInfos = parseSinkFieldInfos(sinkFieldList, tdsqlPostgreSQLSink.getSinkName());
+        List<FieldRelation> fieldRelations = parseSinkFields(sinkFieldList, constantFieldMap);
+
+        return new TDSQLPostgresLoadNode(
+                tdsqlPostgreSQLSink.getSinkName(),
+                tdsqlPostgreSQLSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                null,
+                null,
+                null,
+                properties,
+                tdsqlPostgreSQLSink.getJdbcUrl(),
+                tdsqlPostgreSQLSink.getUsername(),
+                tdsqlPostgreSQLSink.getPassword(),
+                tdsqlPostgreSQLSink.getSchemaName() + "." + tdsqlPostgreSQLSink.getTableName(),
+                tdsqlPostgreSQLSink.getPrimaryKey());
+    }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
similarity index 86%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index 2a61b385a1..72399221f8 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
 
 import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
 import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.sort.protocol.FieldInfo;
@@ -39,9 +39,9 @@ public class TubeMqProvider implements ExtractNodeProvider {
     }
 
     @Override
-    public ExtractNode createNode(StreamNode streamNodeInfo) {
+    public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
         TubeMQSource source = (TubeMQSource) streamNodeInfo;
-        List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+        List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
         Map<String, String> properties = parseProperties(source.getProperties());
 
         return new TubeMQExtractNode(
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
deleted file mode 100644
index 9209d19b5b..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ /dev/null
@@ -1,822 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.util;
-
-import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.consts.SinkType;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.FieldType;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.sink.SinkField;
-import org.apache.inlong.manager.pojo.sink.StreamSink;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
-import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
-import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
-import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSink;
-import org.apache.inlong.manager.pojo.sink.hbase.HBaseSink;
-import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSink;
-import org.apache.inlong.manager.pojo.sink.hive.HivePartitionField;
-import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
-import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
-import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
-import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
-import org.apache.inlong.manager.pojo.sink.kudu.KuduSink;
-import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
-import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
-import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
-import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
-import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
-import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSink;
-import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
-import org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
-import org.apache.inlong.manager.pojo.stream.StreamField;
-import org.apache.inlong.sort.formats.common.StringTypeInfo;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.constant.HudiConstant;
-import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
-import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.node.format.AvroFormat;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.CsvFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.Format;
-import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
-import org.apache.inlong.sort.protocol.node.format.JsonFormat;
-import org.apache.inlong.sort.protocol.node.format.RawFormat;
-import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
-import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
-import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
-import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
-import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
-import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
-import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
-import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
-import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
-import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.node.load.KuduLoadNode;
-import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
-import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
-import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
-import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
-import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
-import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
-import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
-import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelation;
-import org.apache.inlong.sort.protocol.transformation.FunctionParam;
-import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
-import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Util for load node info.
- */
-public class LoadNodeUtils {
-
-    /**
-     * Create nodes of data load.
-     */
-    public static List<LoadNode> createLoadNodes(List<StreamSink> streamSinks, Map<String, StreamField> fieldMap) {
-        if (CollectionUtils.isEmpty(streamSinks)) {
-            return Lists.newArrayList();
-        }
-        return streamSinks.stream()
-                .map(sink -> LoadNodeUtils.createLoadNode(sink, fieldMap))
-                .collect(Collectors.toList());
-    }
-
-    /**
-     * Create load node from the stream sink info.
-     */
-    public static LoadNode createLoadNode(StreamSink streamSink, Map<String, StreamField> constantFieldMap) {
-        List<FieldInfo> fieldInfos = streamSink.getSinkFieldList().stream()
-                .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, streamSink.getSinkName()))
-                .collect(Collectors.toList());
-        List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(), constantFieldMap);
-        Map<String, String> properties = streamSink.getProperties().entrySet().stream()
-                .filter(v -> Objects.nonNull(v.getValue()))
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
-        String sinkType = streamSink.getSinkType();
-        switch (sinkType) {
-            case SinkType.KAFKA:
-                return createLoadNode((KafkaSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.HIVE:
-                return createLoadNode((HiveSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.HBASE:
-                return createLoadNode((HBaseSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.POSTGRESQL:
-                return createLoadNode((PostgreSQLSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.CLICKHOUSE:
-                return createLoadNode((ClickHouseSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.ICEBERG:
-                return createLoadNode((IcebergSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.HUDI:
-                return createLoadNode((HudiSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.SQLSERVER:
-                return createLoadNode((SQLServerSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.ELASTICSEARCH:
-                return createLoadNode((ElasticsearchSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.HDFS:
-                return createLoadNode((HDFSSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.GREENPLUM:
-                return createLoadNode((GreenplumSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.MYSQL:
-                return createLoadNode((MySQLSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.ORACLE:
-                return createLoadNode((OracleSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.TDSQLPOSTGRESQL:
-                return createLoadNode((TDSQLPostgreSQLSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.DORIS:
-                return createLoadNode((DorisSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.STARROCKS:
-                return createLoadNode((StarRocksSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.KUDU:
-                return createLoadNode((KuduSink) streamSink, fieldInfos, fieldRelations, properties);
-            case SinkType.REDIS:
-                return createLoadNode((RedisSink) streamSink, fieldInfos, fieldRelations, properties);
-            default:
-                throw new BusinessException(String.format("Unsupported sinkType=%s to create load node", sinkType));
-        }
-    }
-
-    /**
-     * Create load node of Kafka.
-     */
-    public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        Integer sinkParallelism = null;
-        if (StringUtils.isNotEmpty(kafkaSink.getPartitionNum())) {
-            sinkParallelism = Integer.parseInt(kafkaSink.getPartitionNum());
-        }
-        DataTypeEnum dataType = DataTypeEnum.forType(kafkaSink.getSerializationType());
-        Format format;
-        switch (dataType) {
-            case CSV:
-                format = new CsvFormat();
-                break;
-            case AVRO:
-                format = new AvroFormat();
-                break;
-            case JSON:
-                format = new JsonFormat();
-                break;
-            case CANAL:
-                format = new CanalJsonFormat();
-                break;
-            case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
-                break;
-            case RAW:
-                format = new RawFormat();
-                break;
-            default:
-                throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
-        }
-
-        return new KafkaLoadNode(
-                kafkaSink.getSinkName(),
-                kafkaSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                Lists.newArrayList(),
-                null,
-                kafkaSink.getTopicName(),
-                kafkaSink.getBootstrapServers(),
-                format,
-                sinkParallelism,
-                properties,
-                kafkaSink.getPrimaryKey());
-    }
-
-    /**
-     * Create load node of Hive.
-     */
-    public static HiveLoadNode createLoadNode(HiveSink hiveSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        List<FieldInfo> partitionFields = Lists.newArrayList();
-        if (CollectionUtils.isNotEmpty(hiveSink.getPartitionFieldList())) {
-            partitionFields = hiveSink.getPartitionFieldList().stream()
-                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hiveSink.getSinkName(),
-                            FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
-                                    partitionField.getFieldFormat())))
-                    .collect(Collectors.toList());
-        }
-        return new HiveLoadNode(
-                hiveSink.getSinkName(),
-                hiveSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                Lists.newArrayList(),
-                null,
-                null,
-                properties,
-                null,
-                hiveSink.getDbName(),
-                hiveSink.getTableName(),
-                hiveSink.getHiveConfDir(),
-                hiveSink.getHiveVersion(),
-                null,
-                partitionFields);
-    }
-
-    /**
-     * Create load node of HBase.
-     */
-    public static HbaseLoadNode createLoadNode(HBaseSink hbaseSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        return new HbaseLoadNode(
-                hbaseSink.getSinkName(),
-                hbaseSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                Lists.newArrayList(),
-                null,
-                null,
-                properties,
-                hbaseSink.getTableName(),
-                hbaseSink.getNamespace(),
-                hbaseSink.getZkQuorum(),
-                hbaseSink.getRowKey(),
-                hbaseSink.getBufferFlushMaxSize(),
-                hbaseSink.getZkNodeParent(),
-                hbaseSink.getBufferFlushMaxRows(),
-                hbaseSink.getBufferFlushInterval());
-    }
-
-    /**
-     * Create load node of PostgreSQL.
-     */
-    public static PostgresLoadNode createLoadNode(PostgreSQLSink postgreSQLSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        return new PostgresLoadNode(
-                postgreSQLSink.getSinkName(),
-                postgreSQLSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                postgreSQLSink.getJdbcUrl(),
-                postgreSQLSink.getUsername(),
-                postgreSQLSink.getPassword(),
-                postgreSQLSink.getDbName() + "." + postgreSQLSink.getTableName(),
-                postgreSQLSink.getPrimaryKey());
-    }
-
-    /**
-     * Create load node of ClickHouse.
-     */
-    public static ClickHouseLoadNode createLoadNode(ClickHouseSink ckSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        return new ClickHouseLoadNode(
-                ckSink.getSinkName(),
-                ckSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                ckSink.getDbName() + "." + ckSink.getTableName(),
-                ckSink.getJdbcUrl() + "/" + ckSink.getDbName(),
-                ckSink.getUsername(),
-                ckSink.getPassword(),
-                ckSink.getPrimaryKey());
-    }
-
-    /**
-     * Create load node of Doris.
-     */
-    public static DorisLoadNode createLoadNode(DorisSink dorisSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        Format format = null;
-        if (dorisSink.getSinkMultipleEnable() != null && dorisSink.getSinkMultipleEnable() && StringUtils.isNotBlank(
-                dorisSink.getSinkMultipleFormat())) {
-            DataTypeEnum dataType = DataTypeEnum.forType(dorisSink.getSinkMultipleFormat());
-            switch (dataType) {
-                case CANAL:
-                    format = new CanalJsonFormat();
-                    break;
-                case DEBEZIUM_JSON:
-                    format = new DebeziumJsonFormat();
-                    break;
-                default:
-                    throw new IllegalArgumentException(String.format("Unsupported dataType=%s for doris", dataType));
-            }
-        }
-        return new DorisLoadNode(
-                dorisSink.getSinkName(),
-                dorisSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                dorisSink.getFeNodes(),
-                dorisSink.getUsername(),
-                dorisSink.getPassword(),
-                dorisSink.getTableIdentifier(),
-                null,
-                dorisSink.getSinkMultipleEnable(),
-                format,
-                dorisSink.getDatabasePattern(),
-                dorisSink.getTablePattern());
-    }
-
-    /**
-     * Create load node of StarRocks.
-     */
-    public static StarRocksLoadNode createLoadNode(StarRocksSink starRocksSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        Format format = null;
-        if (starRocksSink.getSinkMultipleEnable() != null && starRocksSink.getSinkMultipleEnable()
-                && StringUtils.isNotBlank(
-                        starRocksSink.getSinkMultipleFormat())) {
-            DataTypeEnum dataType = DataTypeEnum.forType(starRocksSink.getSinkMultipleFormat());
-            switch (dataType) {
-                case CANAL:
-                    format = new CanalJsonFormat();
-                    break;
-                case DEBEZIUM_JSON:
-                    format = new DebeziumJsonFormat();
-                    break;
-                default:
-                    throw new IllegalArgumentException(
-                            String.format("Unsupported dataType=%s for StarRocks", dataType));
-            }
-        }
-        return new StarRocksLoadNode(
-                starRocksSink.getSinkName(),
-                starRocksSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                starRocksSink.getJdbcUrl(),
-                starRocksSink.getLoadUrl(),
-                starRocksSink.getUsername(),
-                starRocksSink.getPassword(),
-                starRocksSink.getDatabaseName(),
-                starRocksSink.getTableName(),
-                starRocksSink.getPrimaryKey(),
-                starRocksSink.getSinkMultipleEnable(),
-                format,
-                starRocksSink.getDatabasePattern(),
-                starRocksSink.getTablePattern());
-    }
-
-    /**
-     * Create load node of Kudu.
-     */
-    public static KuduLoadNode createLoadNode(
-            KuduSink kuduSink,
-            List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations,
-            Map<String, String> properties) {
-        return new KuduLoadNode(
-                kuduSink.getSinkName(),
-                kuduSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                kuduSink.getMasters(),
-                kuduSink.getTableName(),
-                kuduSink.getPartitionKey());
-    }
-
-    private static LoadNode createLoadNode(
-            RedisSink redisSink,
-            List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations,
-            Map<String, String> properties) {
-        String clusterMode = redisSink.getClusterMode();
-        String dataType = redisSink.getDataType();
-        String schemaMapMode = redisSink.getSchemaMapMode();
-        String host = redisSink.getHost();
-        Integer port = redisSink.getPort();
-        String clusterNodes = redisSink.getClusterNodes();
-        String masterName = redisSink.getMasterName();
-        String sentinelsInfo = redisSink.getSentinelsInfo();
-        Integer database = redisSink.getDatabase();
-        String password = redisSink.getPassword();
-        Integer ttl = redisSink.getTtl();
-        Integer timeout = redisSink.getTimeout();
-        Integer soTimeout = redisSink.getSoTimeout();
-        Integer maxTotal = redisSink.getMaxTotal();
-        Integer maxIdle = redisSink.getMaxIdle();
-        Integer minIdle = redisSink.getMinIdle();
-        Integer maxRetries = redisSink.getMaxRetries();
-
-        Format format = parsingFormat(
-                redisSink.getFormatDataType(),
-                false,
-                redisSink.getFormatDataSeparator(),
-                false);
-
-        return new RedisLoadNode(
-                redisSink.getSinkName(),
-                redisSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                clusterMode,
-                dataType,
-                schemaMapMode,
-                host,
-                port,
-                clusterNodes,
-                masterName,
-                sentinelsInfo,
-                database,
-                password,
-                ttl,
-                format,
-                timeout,
-                soTimeout,
-                maxTotal,
-                maxIdle,
-                minIdle,
-                maxRetries);
-    }
-
-    /**
-     * Create load node of Iceberg.
-     */
-    public static IcebergLoadNode createLoadNode(IcebergSink icebergSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType());
-        return new IcebergLoadNode(
-                icebergSink.getSinkName(),
-                icebergSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                icebergSink.getDbName(),
-                icebergSink.getTableName(),
-                icebergSink.getPrimaryKey(),
-                catalogType,
-                icebergSink.getCatalogUri(),
-                icebergSink.getWarehouse());
-    }
-
-    /**
-     * Create load node of Hudi.
-     */
-    public static HudiLoadNode createLoadNode(HudiSink hudiSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        HudiConstant.CatalogType catalogType = HudiConstant.CatalogType.forName(hudiSink.getCatalogType());
-
-        return new HudiLoadNode(
-                hudiSink.getSinkName(),
-                hudiSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                hudiSink.getDbName(),
-                hudiSink.getTableName(),
-                hudiSink.getPrimaryKey(),
-                catalogType,
-                hudiSink.getCatalogUri(),
-                hudiSink.getWarehouse(),
-                hudiSink.getExtList(),
-                hudiSink.getPartitionKey());
-    }
-
-    /**
-     * Create load node of SQLServer.
-     */
-    public static SqlServerLoadNode createLoadNode(SQLServerSink sqlServerSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        return new SqlServerLoadNode(
-                sqlServerSink.getSinkName(),
-                sqlServerSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                sqlServerSink.getJdbcUrl(),
-                sqlServerSink.getUsername(),
-                sqlServerSink.getPassword(),
-                sqlServerSink.getSchemaName(),
-                sqlServerSink.getTableName(),
-                sqlServerSink.getPrimaryKey());
-    }
-
-    /**
-     * Create Elasticsearch load node
-     */
-    public static ElasticsearchLoadNode createLoadNode(ElasticsearchSink elasticsearchSink,
-            List<FieldInfo> fieldInfos, List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        Format format = null;
-        if (elasticsearchSink.getSinkMultipleEnable() != null && elasticsearchSink.getSinkMultipleEnable()
-                && StringUtils.isNotBlank(
-                        elasticsearchSink.getSinkMultipleFormat())) {
-            DataTypeEnum dataType = DataTypeEnum.forType(elasticsearchSink.getSinkMultipleFormat());
-            switch (dataType) {
-                case CANAL:
-                    format = new CanalJsonFormat();
-                    break;
-                case DEBEZIUM_JSON:
-                    format = new DebeziumJsonFormat();
-                    break;
-                default:
-                    throw new IllegalArgumentException(
-                            String.format("Unsupported dataType=%s for elasticsearch", dataType));
-            }
-        }
-        return new ElasticsearchLoadNode(
-                elasticsearchSink.getSinkName(),
-                elasticsearchSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                elasticsearchSink.getIndexName(),
-                elasticsearchSink.getHosts(),
-                elasticsearchSink.getUsername(),
-                elasticsearchSink.getPassword(),
-                elasticsearchSink.getDocumentType(),
-                elasticsearchSink.getPrimaryKey(),
-                elasticsearchSink.getEsVersion(),
-                elasticsearchSink.getSinkMultipleEnable(),
-                format,
-                elasticsearchSink.getIndexPattern());
-    }
-
-    /**
-     * Create load node of HDFS.
-     */
-    public static FileSystemLoadNode createLoadNode(HDFSSink hdfsSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        List<FieldInfo> partitionFields = Lists.newArrayList();
-        if (CollectionUtils.isNotEmpty(hdfsSink.getPartitionFieldList())) {
-            partitionFields = hdfsSink.getPartitionFieldList().stream()
-                    .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hdfsSink.getSinkName(),
-                            FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
-                                    partitionField.getFieldFormat())))
-                    .collect(Collectors.toList());
-        }
-
-        return new FileSystemLoadNode(
-                hdfsSink.getSinkName(),
-                hdfsSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                Lists.newArrayList(),
-                hdfsSink.getDataPath(),
-                hdfsSink.getFileFormat(),
-                null,
-                properties,
-                partitionFields,
-                hdfsSink.getServerTimeZone());
-    }
-
-    /**
-     * Create greenplum load node
-     */
-    public static GreenplumLoadNode createLoadNode(GreenplumSink greenplumSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        return new GreenplumLoadNode(
-                greenplumSink.getSinkName(),
-                greenplumSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                greenplumSink.getJdbcUrl(),
-                greenplumSink.getUsername(),
-                greenplumSink.getPassword(),
-                greenplumSink.getTableName(),
-                greenplumSink.getPrimaryKey());
-    }
-
-    /**
-     * Create load node of MySQL.
-     */
-    public static MySqlLoadNode createLoadNode(MySQLSink mysqlSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        return new MySqlLoadNode(
-                mysqlSink.getSinkName(),
-                mysqlSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                Lists.newArrayList(),
-                null,
-                null,
-                properties,
-                MySQLSinkDTO.setDbNameToUrl(mysqlSink.getJdbcUrl(), mysqlSink.getDatabaseName()),
-                mysqlSink.getUsername(),
-                mysqlSink.getPassword(),
-                mysqlSink.getTableName(),
-                mysqlSink.getPrimaryKey());
-    }
-
-    /**
-     * Create load node of ORACLE.
-     */
-    public static OracleLoadNode createLoadNode(OracleSink oracleSink, List<FieldInfo> fieldInfos,
-            List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        return new OracleLoadNode(
-                oracleSink.getSinkName(),
-                oracleSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                oracleSink.getJdbcUrl(),
-                oracleSink.getUsername(),
-                oracleSink.getPassword(),
-                oracleSink.getTableName(),
-                oracleSink.getPrimaryKey());
-    }
-
-    /**
-     * Create load node of TDSQLPostgreSQL.
-     */
-    public static TDSQLPostgresLoadNode createLoadNode(TDSQLPostgreSQLSink tdsqlPostgreSQLSink,
-            List<FieldInfo> fieldInfos, List<FieldRelation> fieldRelations, Map<String, String> properties) {
-        return new TDSQLPostgresLoadNode(
-                tdsqlPostgreSQLSink.getSinkName(),
-                tdsqlPostgreSQLSink.getSinkName(),
-                fieldInfos,
-                fieldRelations,
-                null,
-                null,
-                null,
-                properties,
-                tdsqlPostgreSQLSink.getJdbcUrl(),
-                tdsqlPostgreSQLSink.getUsername(),
-                tdsqlPostgreSQLSink.getPassword(),
-                tdsqlPostgreSQLSink.getSchemaName() + "." + tdsqlPostgreSQLSink.getTableName(),
-                tdsqlPostgreSQLSink.getPrimaryKey());
-    }
-
-    /**
-     * Parse information field of data sink.
-     */
-    public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList,
-            Map<String, StreamField> constantFieldMap) {
-        if (CollectionUtils.isEmpty(fieldList)) {
-            return Lists.newArrayList();
-        }
-        return fieldList.stream()
-                .filter(sinkField -> StringUtils.isNotEmpty(sinkField.getSourceFieldName()))
-                .map(field -> {
-                    FieldInfo outputField = new FieldInfo(field.getFieldName(),
-                            FieldInfoUtils.convertFieldFormat(field.getFieldType(), field.getFieldFormat()));
-                    FunctionParam inputField;
-                    String fieldKey = String.format("%s-%s", field.getOriginNodeName(), field.getSourceFieldName());
-                    StreamField constantField = constantFieldMap.get(fieldKey);
-                    if (constantField != null) {
-                        if (outputField.getFormatInfo() != null
-                                && outputField.getFormatInfo().getTypeInfo() == StringTypeInfo.INSTANCE) {
-                            inputField = new StringConstantParam(constantField.getFieldValue());
-                        } else {
-                            inputField = new ConstantParam(constantField.getFieldValue());
-                        }
-                    } else if (FieldType.FUNCTION.name().equalsIgnoreCase(field.getSourceFieldType())) {
-                        inputField = new CustomFunction(field.getSourceFieldName());
-                    } else {
-                        inputField = new FieldInfo(field.getSourceFieldName(), field.getOriginNodeName(),
-                                FieldInfoUtils.convertFieldFormat(field.getSourceFieldType()));
-                    }
-                    return new FieldRelation(inputField, outputField);
-                }).collect(Collectors.toList());
-    }
-
-    /**
-     * Check the validation of Hive partition field.
-     */
-    public static void checkPartitionField(List<SinkField> fieldList, List<HivePartitionField> partitionList) {
-        if (CollectionUtils.isEmpty(partitionList)) {
-            return;
-        }
-        if (CollectionUtils.isEmpty(fieldList)) {
-            throw new BusinessException(ErrorCodeEnum.SINK_FIELD_LIST_IS_EMPTY);
-        }
-
-        Map<String, SinkField> sinkFieldMap = new HashMap<>(fieldList.size());
-        fieldList.forEach(field -> sinkFieldMap.put(field.getFieldName(), field));
-
-        for (HivePartitionField partitionField : partitionList) {
-            String fieldName = partitionField.getFieldName();
-            if (StringUtils.isBlank(fieldName)) {
-                throw new BusinessException(ErrorCodeEnum.PARTITION_FIELD_NAME_IS_EMPTY);
-            }
-
-            SinkField sinkField = sinkFieldMap.get(fieldName);
-            if (sinkField == null) {
-                throw new BusinessException(
-                        String.format(ErrorCodeEnum.PARTITION_FIELD_NOT_FOUND.getMessage(), fieldName));
-            }
-            if (StringUtils.isBlank(sinkField.getSourceFieldName())) {
-                throw new BusinessException(
-                        String.format(ErrorCodeEnum.PARTITION_FIELD_NO_SOURCE_FIELD.getMessage(), fieldName));
-            }
-        }
-    }
-
-    /**
-     * Parse format
-     *
-     * @param formatName data serialization, support: csv, json, canal, avro, etc
-     * @param wrapWithInlongMsg whether wrap content with {@link InLongMsgFormat}
-     * @param separatorStr the separator of data content
-     * @param ignoreParseErrors whether ignore deserialization error data
-     * @return the format for serialized content
-     */
-    private static Format parsingFormat(
-            String formatName,
-            boolean wrapWithInlongMsg,
-            String separatorStr,
-            boolean ignoreParseErrors) {
-        Format format;
-        DataTypeEnum dataType = DataTypeEnum.forType(formatName);
-        switch (dataType) {
-            case CSV:
-                if (StringUtils.isNumeric(separatorStr)) {
-                    char dataSeparator = (char) Integer.parseInt(separatorStr);
-                    separatorStr = Character.toString(dataSeparator);
-                }
-                CsvFormat csvFormat = new CsvFormat(separatorStr);
-                csvFormat.setIgnoreParseErrors(ignoreParseErrors);
-                format = csvFormat;
-                break;
-            case AVRO:
-                format = new AvroFormat();
-                break;
-            case JSON:
-                JsonFormat jsonFormat = new JsonFormat();
-                jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
-                format = jsonFormat;
-                break;
-            case CANAL:
-                format = new CanalJsonFormat();
-                break;
-            case DEBEZIUM_JSON:
-                DebeziumJsonFormat debeziumJsonFormat = new DebeziumJsonFormat();
-                debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
-                format = debeziumJsonFormat;
-                break;
-            case RAW:
-                format = new RawFormat();
-                break;
-            default:
-                throw new IllegalArgumentException(String.format("Unsupported dataType=%s", dataType));
-        }
-        if (wrapWithInlongMsg) {
-            Format innerFormat = format;
-            format = new InLongMsgFormat(innerFormat, false);
-        }
-        return format;
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index d23f08ddf3..8c58dd1985 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -21,8 +21,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
-import org.apache.inlong.manager.pojo.sort.node.NodeProviderFactory;
-import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
+import org.apache.inlong.manager.pojo.sort.node.NodeFactory;
 import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
 import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
 import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -211,9 +210,9 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
     private List<Node> createNodes(List<StreamSource> sources, List<TransformResponse> transformResponses,
             List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
         List<Node> nodes = new ArrayList<>();
-        nodes.addAll(NodeProviderFactory.createExtractNodes(sources));
+        nodes.addAll(NodeFactory.createExtractNodes(sources));
         nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
-        nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
+        nodes.addAll(NodeFactory.createLoadNodes(sinks, constantFieldMap));
         return nodes;
     }