You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/06/08 07:20:45 UTC
[inlong] branch master updated: [INLONG-8171][Manager] Optimize the way of creating LoadNodes to make it easy to expand and maintain (#8181)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 716d5011b3 [INLONG-8171][Manager] Optimize the way of creating LoadNodes to make it easy to expand and maintain (#8181)
716d5011b3 is described below
commit 716d5011b3cab4783e619ba937d204570cc83f68
Author: chestnufang <65...@users.noreply.github.com>
AuthorDate: Thu Jun 8 15:20:38 2023 +0800
[INLONG-8171][Manager] Optimize the way of creating LoadNodes to make it easy to expand and maintain (#8181)
Co-authored-by: chestnufang <ch...@tencent.com>
Co-authored-by: Charles Zhang <do...@apache.org>
Co-authored-by: healchow <he...@gmail.com>
---
.../inlong/manager/common/consts/SinkType.java | 7 +-
.../inlong/manager/common/consts/SourceType.java | 7 +-
.../consts/{SinkType.java => StreamType.java} | 18 +-
...actory.java => ExtractNodeProviderFactory.java} | 50 +-
.../pojo/sort/node/LoadNodeProviderFactory.java | 90 +++
.../inlong/manager/pojo/sort/node/NodeFactory.java | 64 ++
.../sort/node/{ => base}/ExtractNodeProvider.java | 16 +-
.../pojo/sort/node/base/LoadNodeProvider.java | 100 +++
.../pojo/sort/node/{ => base}/NodeProvider.java | 10 +-
.../pojo/sort/node/extract/HudiProvider.java | 64 --
.../pojo/sort/node/extract/KafkaProvider.java | 93 ---
.../pojo/sort/node/extract/RedisProvider.java | 130 ----
.../pojo/sort/node/extract/SqlServerProvider.java | 63 --
.../sort/node/provider/ClickHouseProvider.java | 64 ++
.../pojo/sort/node/provider/DorisProvider.java | 89 +++
.../sort/node/provider/ElasticsearchProvider.java | 94 +++
.../pojo/sort/node/provider/GreenplumProvider.java | 64 ++
.../pojo/sort/node/provider/HBaseProvider.java | 69 ++
.../pojo/sort/node/provider/HDFSProvider.java | 76 ++
.../pojo/sort/node/provider/HiveProvider.java | 79 ++
.../pojo/sort/node/provider/HudiProvider.java | 98 +++
.../pojo/sort/node/provider/IcebergProvider.java | 69 ++
.../pojo/sort/node/provider/KafkaProvider.java | 159 ++++
.../pojo/sort/node/provider/KuduProvider.java | 63 ++
.../node/{extract => provider}/MongoProvider.java | 8 +-
.../MySQLBinlogProvider.java} | 12 +-
.../pojo/sort/node/provider/MySQLProvider.java | 68 ++
.../node/{extract => provider}/OracleProvider.java | 47 +-
.../PostgreSQLProvider.java} | 49 +-
.../node/{extract => provider}/PulsarProvider.java | 8 +-
.../pojo/sort/node/provider/RedisProvider.java | 262 +++++++
.../pojo/sort/node/provider/SQLServerProvider.java | 93 +++
.../pojo/sort/node/provider/StarRocksProvider.java | 93 +++
.../node/provider/TDSQLPostgreSQLProvider.java | 67 ++
.../node/{extract => provider}/TubeMqProvider.java | 8 +-
.../manager/pojo/sort/util/LoadNodeUtils.java | 822 ---------------------
.../resource/sort/DefaultSortConfigOperator.java | 7 +-
37 files changed, 1894 insertions(+), 1286 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index a74c9c3370..be60980f4c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -20,21 +20,16 @@ package org.apache.inlong.manager.common.consts;
/**
* Constants of sink type.
*/
-public class SinkType {
+public class SinkType extends StreamType {
public static final String HIVE = "HIVE";
- public static final String KAFKA = "KAFKA";
public static final String ICEBERG = "ICEBERG";
- public static final String HUDI = "HUDI";
public static final String CLICKHOUSE = "CLICKHOUSE";
public static final String HBASE = "HBASE";
- public static final String POSTGRESQL = "POSTGRESQL";
public static final String ELASTICSEARCH = "ELASTICSEARCH";
- public static final String SQLSERVER = "SQLSERVER";
public static final String HDFS = "HDFS";
public static final String GREENPLUM = "GREENPLUM";
public static final String MYSQL = "MYSQL";
- public static final String ORACLE = "ORACLE";
public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
public static final String DORIS = "DORIS";
public static final String STARROCKS = "STARROCKS";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index c0a718f2bb..e01989501a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -25,23 +25,18 @@ import java.util.Map;
/**
* Constants of source type.
*/
-public class SourceType {
+public class SourceType extends StreamType {
public static final String AUTO_PUSH = "AUTO_PUSH";
public static final String TUBEMQ = "TUBEMQ";
public static final String PULSAR = "PULSAR";
- public static final String KAFKA = "KAFKA";
public static final String FILE = "FILE";
public static final String MYSQL_SQL = "MYSQL_SQL";
public static final String MYSQL_BINLOG = "MYSQL_BINLOG";
- public static final String POSTGRESQL = "POSTGRESQL";
- public static final String ORACLE = "ORACLE";
- public static final String SQLSERVER = "SQLSERVER";
public static final String MONGODB = "MONGODB";
public static final String REDIS = "REDIS";
public static final String MQTT = "MQTT";
- public static final String HUDI = "HUDI";
public static final Map<String, TaskTypeEnum> SOURCE_TASK_MAP = new HashMap<String, TaskTypeEnum>() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
similarity index 60%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
index a74c9c3370..26685814de 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/StreamType.java
@@ -18,26 +18,14 @@
package org.apache.inlong.manager.common.consts;
/**
- * Constants of sink type.
+ * Constant for stream types, indicating that both StreamSource and StreamSink support these types.
*/
-public class SinkType {
+public class StreamType {
- public static final String HIVE = "HIVE";
public static final String KAFKA = "KAFKA";
- public static final String ICEBERG = "ICEBERG";
public static final String HUDI = "HUDI";
- public static final String CLICKHOUSE = "CLICKHOUSE";
- public static final String HBASE = "HBASE";
public static final String POSTGRESQL = "POSTGRESQL";
- public static final String ELASTICSEARCH = "ELASTICSEARCH";
public static final String SQLSERVER = "SQLSERVER";
- public static final String HDFS = "HDFS";
- public static final String GREENPLUM = "GREENPLUM";
- public static final String MYSQL = "MYSQL";
public static final String ORACLE = "ORACLE";
- public static final String TDSQLPOSTGRESQL = "TDSQLPOSTGRESQL";
- public static final String DORIS = "DORIS";
- public static final String STARROCKS = "STARROCKS";
- public static final String KUDU = "KUDU";
- public static final String REDIS = "REDIS";
+
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
similarity index 58%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
index 35c447ec7e..6e95425ef2 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProviderFactory.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProviderFactory.java
@@ -19,30 +19,25 @@ package org.apache.inlong.manager.pojo.sort.node;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.sort.node.extract.HudiProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.KafkaProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.MongoProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.MysqlBinlogProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.OracleProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.PostgreSqlProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.PulsarProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.RedisProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.SqlServerProvider;
-import org.apache.inlong.manager.pojo.sort.node.extract.TubeMqProvider;
-import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.HudiProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.KafkaProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.MongoProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.MySQLBinlogProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.OracleProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.PostgreSQLProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.PulsarProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.RedisProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.SQLServerProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.TubeMqProvider;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
/**
- * The node provider factory.
+ * Factory of the extract node provider.
*/
-public class NodeProviderFactory {
+public class ExtractNodeProviderFactory {
/**
* The extract node provider collection
@@ -58,9 +53,9 @@ public class NodeProviderFactory {
EXTRACT_NODE_PROVIDER_LIST.add(new PulsarProvider());
EXTRACT_NODE_PROVIDER_LIST.add(new RedisProvider());
EXTRACT_NODE_PROVIDER_LIST.add(new TubeMqProvider());
- EXTRACT_NODE_PROVIDER_LIST.add(new SqlServerProvider());
- EXTRACT_NODE_PROVIDER_LIST.add(new PostgreSqlProvider());
- EXTRACT_NODE_PROVIDER_LIST.add(new MysqlBinlogProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new SQLServerProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new PostgreSQLProvider());
+ EXTRACT_NODE_PROVIDER_LIST.add(new MySQLBinlogProvider());
}
@@ -77,17 +72,4 @@ public class NodeProviderFactory {
.orElseThrow(() -> new BusinessException(ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORT,
String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORT.getMessage(), sourceType)));
}
-
- /**
- * Create extract nodes from the given sources.
- */
- public static List<ExtractNode> createExtractNodes(List<StreamSource> sourceInfos) {
- if (CollectionUtils.isEmpty(sourceInfos)) {
- return Lists.newArrayList();
- }
- return sourceInfos.stream().map(v -> {
- String sourceType = v.getSourceType();
- return getExtractNodeProvider(sourceType).createNode(v);
- }).collect(Collectors.toList());
- }
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/LoadNodeProviderFactory.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/LoadNodeProviderFactory.java
new file mode 100644
index 0000000000..1df97b77fb
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/LoadNodeProviderFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.ClickHouseProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.DorisProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.ElasticsearchProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.GreenplumProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.HBaseProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.HDFSProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.HiveProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.HudiProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.IcebergProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.KafkaProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.KuduProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.MySQLProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.OracleProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.PostgreSQLProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.RedisProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.SQLServerProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.StarRocksProvider;
+import org.apache.inlong.manager.pojo.sort.node.provider.TDSQLPostgreSQLProvider;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Factory of the load node provider.
+ */
+public class LoadNodeProviderFactory {
+
+ /**
+ * The load node provider collection
+ */
+ private static final List<LoadNodeProvider> LOAD_NODE_PROVIDER_LIST = new ArrayList<>();
+
+ static {
+ // The Providers Parsing SinkInfo to LoadNode which sort needed
+ LOAD_NODE_PROVIDER_LIST.add(new KafkaProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new ClickHouseProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new DorisProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new ElasticsearchProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new GreenplumProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new HBaseProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new HDFSProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new HiveProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new HudiProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new IcebergProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new KuduProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new MySQLProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new OracleProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new PostgreSQLProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new RedisProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new SQLServerProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new StarRocksProvider());
+ LOAD_NODE_PROVIDER_LIST.add(new TDSQLPostgreSQLProvider());
+ }
+
+ /**
+ * Get load node provider
+ *
+ * @param sinkType the specified sink type
+ * @return the load node provider
+ */
+ public static LoadNodeProvider getLoadNodeProvider(String sinkType) {
+ return LOAD_NODE_PROVIDER_LIST.stream()
+ .filter(inst -> inst.accept(sinkType))
+ .findFirst()
+ .orElseThrow(() -> new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage(), sinkType)));
+ }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
new file mode 100644
index 0000000000..cc6a545490
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node;
+
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The node factory
+ */
+public class NodeFactory {
+
+ /**
+ * Create extract nodes from the given sources.
+ */
+ public static List<ExtractNode> createExtractNodes(List<StreamSource> sourceInfos) {
+ if (CollectionUtils.isEmpty(sourceInfos)) {
+ return Lists.newArrayList();
+ }
+ return sourceInfos.stream().map(v -> {
+ String sourceType = v.getSourceType();
+ return ExtractNodeProviderFactory.getExtractNodeProvider(sourceType).createExtractNode(v);
+ }).collect(Collectors.toList());
+ }
+
+ /**
+ * Create load nodes from the given sinks.
+ */
+ public static List<LoadNode> createLoadNodes(List<StreamSink> sinkInfos,
+ Map<String, StreamField> constantFieldMap) {
+ if (CollectionUtils.isEmpty(sinkInfos)) {
+ return Lists.newArrayList();
+ }
+ return sinkInfos.stream().map(v -> {
+ String sinkType = v.getSinkType();
+ return LoadNodeProviderFactory.getLoadNodeProvider(sinkType).createLoadNode(v, constantFieldMap);
+ }).collect(Collectors.toList());
+ }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
similarity index 90%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index aeed2df4c8..c5aea94d08 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/ExtractNodeProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sort.node;
+package org.apache.inlong.manager.pojo.sort.node.base;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
@@ -43,30 +43,22 @@ import java.util.stream.Collectors;
*/
public interface ExtractNodeProvider extends NodeProvider {
- /**
- * Determines whether the current instance matches the specified type.
- *
- * @param sourceType the specified source type
- * @return whether the current instance matches the specified type
- */
- Boolean accept(String sourceType);
-
/**
* Create extract node by stream node info
*
* @param nodeInfo stream node info
* @return the extract node
*/
- ExtractNode createNode(StreamNode nodeInfo);
+ ExtractNode createExtractNode(StreamNode nodeInfo);
/**
- * Parse FieldInfos
+ * Parse StreamFieldInfos
*
* @param streamFields The stream fields
* @param nodeId The node id
* @return FieldInfo list
*/
- default List<FieldInfo> parseFieldInfos(List<StreamField> streamFields, String nodeId) {
+ default List<FieldInfo> parseStreamFieldInfos(List<StreamField> streamFields, String nodeId) {
// Filter constant fields
return streamFields.stream().filter(s -> Objects.isNull(s.getFieldValue()))
.map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, nodeId))
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
new file mode 100644
index 0000000000..2302c15943
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/LoadNodeProvider.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.base;
+
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.formats.common.StringTypeInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Interface of the load node provider
+ */
+public interface LoadNodeProvider extends NodeProvider {
+
+ /**
+ * Create load node by stream node info
+ *
+ * @param nodeInfo stream node info
+ * @param constantFieldMap the constant field map
+ * @return the load node
+ */
+ LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap);
+
+ /**
+ * Parse FieldInfos
+ *
+ * @param sinkFields The stream sink fields
+ * @param nodeId The node id
+ * @return FieldInfo list
+ */
+ default List<FieldInfo> parseSinkFieldInfos(List<SinkField> sinkFields, String nodeId) {
+ return sinkFields.stream().map(field -> FieldInfoUtils.parseSinkFieldInfo(field, nodeId))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Parse information field of data sink.
+ */
+ default List<FieldRelation> parseSinkFields(List<SinkField> fieldList,
+ Map<String, StreamField> constantFieldMap) {
+ if (CollectionUtils.isEmpty(fieldList)) {
+ return Lists.newArrayList();
+ }
+ return fieldList.stream()
+ .filter(sinkField -> StringUtils.isNotEmpty(sinkField.getSourceFieldName()))
+ .map(field -> {
+ FieldInfo outputField = new FieldInfo(field.getFieldName(),
+ FieldInfoUtils.convertFieldFormat(field.getFieldType(), field.getFieldFormat()));
+ FunctionParam inputField;
+ String fieldKey = String.format("%s-%s", field.getOriginNodeName(), field.getSourceFieldName());
+ StreamField constantField = constantFieldMap.get(fieldKey);
+ if (constantField != null) {
+ if (outputField.getFormatInfo() != null
+ && outputField.getFormatInfo().getTypeInfo() == StringTypeInfo.INSTANCE) {
+ inputField = new StringConstantParam(constantField.getFieldValue());
+ } else {
+ inputField = new ConstantParam(constantField.getFieldValue());
+ }
+ } else if (FieldType.FUNCTION.name().equalsIgnoreCase(field.getSourceFieldType())) {
+ inputField = new CustomFunction(field.getSourceFieldName());
+ } else {
+ inputField = new FieldInfo(field.getSourceFieldName(), field.getOriginNodeName(),
+ FieldInfoUtils.convertFieldFormat(field.getSourceFieldType()));
+ }
+ return new FieldRelation(inputField, outputField);
+ }).collect(Collectors.toList());
+ }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
similarity index 82%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
index f8f7efd030..a40bd1106d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/NodeProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/NodeProvider.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sort.node;
+package org.apache.inlong.manager.pojo.sort.node.base;
import java.util.Map;
import java.util.Objects;
@@ -26,6 +26,14 @@ import java.util.stream.Collectors;
*/
public interface NodeProvider {
+ /**
+ * Determines whether the current instance matches the specified type.
+ *
+ * @param streamType the specified type
+ * @return whether the current instance matches the specified type
+ */
+ Boolean accept(String streamType);
+
/**
* Parse properties
*
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java
deleted file mode 100644
index ebbd00e326..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/HudiProvider.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.node.extract;
-
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
-import org.apache.inlong.manager.pojo.source.hudi.HudiSource;
-import org.apache.inlong.manager.pojo.stream.StreamNode;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The Provider for creating Hudi extract nodes.
- */
-public class HudiProvider implements ExtractNodeProvider {
-
- @Override
- public Boolean accept(String sourceType) {
- return SourceType.HUDI.equals(sourceType);
- }
-
- @Override
- public ExtractNode createNode(StreamNode streamNodeInfo) {
- HudiSource source = (HudiSource) streamNodeInfo;
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
- Map<String, String> properties = parseProperties(source.getProperties());
-
- return new HudiExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- source.getCatalogUri(),
- source.getWarehouse(),
- source.getDbName(),
- source.getTableName(),
- CatalogType.HIVE,
- source.getCheckIntervalInMinus(),
- source.isReadStreamingSkipCompaction(),
- source.getReadStartCommit(),
- properties,
- source.getExtList());
- }
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java
deleted file mode 100644
index 88b63f28e7..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/KafkaProvider.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.node.extract;
-
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
-import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
-import org.apache.inlong.manager.pojo.stream.StreamNode;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
-import org.apache.inlong.sort.protocol.node.format.Format;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The Provider for creating Kafka extract nodes.
- */
-public class KafkaProvider implements ExtractNodeProvider {
-
- @Override
- public Boolean accept(String sourceType) {
- return SourceType.KAFKA.equals(sourceType);
- }
-
- @Override
- public ExtractNode createNode(StreamNode streamNodeInfo) {
- KafkaSource kafkaSource = (KafkaSource) streamNodeInfo;
- List<FieldInfo> fieldInfos = parseFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
- Map<String, String> properties = parseProperties(kafkaSource.getProperties());
-
- String topic = kafkaSource.getTopic();
- String bootstrapServers = kafkaSource.getBootstrapServers();
-
- Format format = parsingFormat(
- kafkaSource.getSerializationType(),
- kafkaSource.isWrapWithInlongMsg(),
- kafkaSource.getDataSeparator(),
- kafkaSource.isIgnoreParseErrors());
-
- KafkaOffset kafkaOffset = KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
- KafkaScanStartupMode startupMode;
- switch (kafkaOffset) {
- case EARLIEST:
- startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
- break;
- case SPECIFIC:
- startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
- break;
- case TIMESTAMP_MILLIS:
- startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
- break;
- case LATEST:
- default:
- startupMode = KafkaScanStartupMode.LATEST_OFFSET;
- }
- final String primaryKey = kafkaSource.getPrimaryKey();
- String groupId = kafkaSource.getGroupId();
- String partitionOffset = kafkaSource.getPartitionOffsets();
- String scanTimestampMillis = kafkaSource.getTimestampMillis();
- return new KafkaExtractNode(kafkaSource.getSourceName(),
- kafkaSource.getSourceName(),
- fieldInfos,
- null,
- properties,
- topic,
- bootstrapServers,
- format,
- startupMode,
- primaryKey,
- groupId,
- partitionOffset,
- scanTimestampMillis);
- }
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java
deleted file mode 100644
index 18f435e89f..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/RedisProvider.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.node.extract;
-
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
-import org.apache.inlong.manager.pojo.source.redis.RedisLookupOptions;
-import org.apache.inlong.manager.pojo.source.redis.RedisSource;
-import org.apache.inlong.manager.pojo.stream.StreamNode;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.LookupOptions;
-import org.apache.inlong.sort.protocol.enums.RedisCommand;
-import org.apache.inlong.sort.protocol.enums.RedisMode;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The Provider for creating Redis extract nodes.
- */
-public class RedisProvider implements ExtractNodeProvider {
-
- @Override
- public Boolean accept(String sourceType) {
- return SourceType.REDIS.equals(sourceType);
- }
-
- @Override
- public ExtractNode createNode(StreamNode streamNodeInfo) {
- RedisSource source = (RedisSource) streamNodeInfo;
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
- Map<String, String> properties = parseProperties(source.getProperties());
-
- RedisMode redisMode = RedisMode.forName(source.getRedisMode());
- switch (redisMode) {
- case STANDALONE:
- return new RedisExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getPrimaryKey(),
- RedisCommand.forName(source.getCommand()),
- source.getHost(),
- source.getPort(),
- source.getPassword(),
- source.getAdditionalKey(),
- source.getDatabase(),
- source.getTimeout(),
- source.getSoTimeout(),
- source.getMaxTotal(),
- source.getMaxIdle(),
- source.getMinIdle(),
- parseLookupOptions(source.getLookupOptions()));
- case SENTINEL:
- return new RedisExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getPrimaryKey(),
- RedisCommand.forName(source.getCommand()),
- source.getMasterName(),
- source.getSentinelsInfo(),
- source.getPassword(),
- source.getAdditionalKey(),
- source.getDatabase(),
- source.getTimeout(),
- source.getSoTimeout(),
- source.getMaxTotal(),
- source.getMaxIdle(),
- source.getMinIdle(),
- parseLookupOptions(source.getLookupOptions()));
- case CLUSTER:
- return new RedisExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getPrimaryKey(),
- RedisCommand.forName(source.getCommand()),
- source.getClusterNodes(),
- source.getPassword(),
- source.getAdditionalKey(),
- source.getDatabase(),
- source.getTimeout(),
- source.getSoTimeout(),
- source.getMaxTotal(),
- source.getMaxIdle(),
- source.getMinIdle(),
- parseLookupOptions(source.getLookupOptions()));
- default:
- throw new IllegalArgumentException(String.format("Unsupported redis-mode=%s for Inlong", redisMode));
- }
- }
-
- /**
- * Parse LookupOptions
- *
- * @param options RedisLookupOptions
- * @return LookupOptions
- */
- private static LookupOptions parseLookupOptions(RedisLookupOptions options) {
- if (options == null) {
- return null;
- }
- return new LookupOptions(options.getLookupCacheMaxRows(), options.getLookupCacheTtl(),
- options.getLookupMaxRetries(), options.getLookupAsync());
- }
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java
deleted file mode 100644
index f9ca5f985a..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/SqlServerProvider.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.node.extract;
-
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
-import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
-import org.apache.inlong.manager.pojo.stream.StreamNode;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.node.ExtractNode;
-import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * The Provider for creating SQLServer extract nodes.
- */
-public class SqlServerProvider implements ExtractNodeProvider {
-
- @Override
- public Boolean accept(String sourceType) {
- return SourceType.SQLSERVER.equals(sourceType);
- }
-
- @Override
- public ExtractNode createNode(StreamNode streamNodeInfo) {
- SQLServerSource source = (SQLServerSource) streamNodeInfo;
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
- Map<String, String> properties = parseProperties(source.getProperties());
-
- return new SqlServerExtractNode(
- source.getSourceName(),
- source.getSourceName(),
- fieldInfos,
- null,
- properties,
- source.getPrimaryKey(),
- source.getHostname(),
- source.getPort(),
- source.getUsername(),
- source.getPassword(),
- source.getDatabase(),
- source.getSchemaName(),
- source.getTableName(),
- source.getServerTimezone());
- }
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ClickHouseProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ClickHouseProvider.java
new file mode 100644
index 0000000000..3838633957
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ClickHouseProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating ClickHouse load nodes.
+ */
+public class ClickHouseProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.CLICKHOUSE.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ ClickHouseSink streamSink = (ClickHouseSink) nodeInfo;
+ Map<String, String> properties = parseProperties(streamSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(streamSink.getSinkFieldList(), streamSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(), constantFieldMap);
+ return new ClickHouseLoadNode(
+ streamSink.getSinkName(),
+ streamSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ streamSink.getDbName() + "." + streamSink.getTableName(),
+ streamSink.getJdbcUrl() + "/" + streamSink.getDbName(),
+ streamSink.getUsername(),
+ streamSink.getPassword(),
+ streamSink.getPrimaryKey());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
new file mode 100644
index 0000000000..2586895f8a
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/DorisProvider.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Doris load nodes.
+ */
+public class DorisProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.DORIS.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ DorisSink dorisSink = (DorisSink) nodeInfo;
+ Map<String, String> properties = parseProperties(dorisSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(dorisSink.getSinkFieldList(), dorisSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(dorisSink.getSinkFieldList(), constantFieldMap);
+ Format format = null;
+ if (dorisSink.getSinkMultipleEnable() != null && dorisSink.getSinkMultipleEnable() && StringUtils.isNotBlank(
+ dorisSink.getSinkMultipleFormat())) {
+ DataTypeEnum dataType = DataTypeEnum.forType(dorisSink.getSinkMultipleFormat());
+ switch (dataType) {
+ case CANAL:
+ format = new CanalJsonFormat();
+ break;
+ case DEBEZIUM_JSON:
+ format = new DebeziumJsonFormat();
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported dataType=%s for doris", dataType));
+ }
+ }
+ return new DorisLoadNode(
+ dorisSink.getSinkName(),
+ dorisSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ dorisSink.getFeNodes(),
+ dorisSink.getUsername(),
+ dorisSink.getPassword(),
+ dorisSink.getTableIdentifier(),
+ null,
+ dorisSink.getSinkMultipleEnable(),
+ format,
+ dorisSink.getDatabasePattern(),
+ dorisSink.getTablePattern());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
new file mode 100644
index 0000000000..5072f9ac46
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/ElasticsearchProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Elasticsearch load nodes.
+ */
+public class ElasticsearchProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.ELASTICSEARCH.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ ElasticsearchSink elasticsearchSink = (ElasticsearchSink) nodeInfo;
+ Map<String, String> properties = parseProperties(elasticsearchSink.getProperties());
+ List<SinkField> sinkFieldList = elasticsearchSink.getSinkFieldList();
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(sinkFieldList, elasticsearchSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(sinkFieldList, constantFieldMap);
+ Format format = null;
+ if (elasticsearchSink.getSinkMultipleEnable() != null && elasticsearchSink.getSinkMultipleEnable()
+ && StringUtils.isNotBlank(
+ elasticsearchSink.getSinkMultipleFormat())) {
+ DataTypeEnum dataType = DataTypeEnum.forType(elasticsearchSink.getSinkMultipleFormat());
+ switch (dataType) {
+ case CANAL:
+ format = new CanalJsonFormat();
+ break;
+ case DEBEZIUM_JSON:
+ format = new DebeziumJsonFormat();
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported dataType=%s for elasticsearch", dataType));
+ }
+ }
+ return new ElasticsearchLoadNode(
+ elasticsearchSink.getSinkName(),
+ elasticsearchSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ elasticsearchSink.getIndexName(),
+ elasticsearchSink.getHosts(),
+ elasticsearchSink.getUsername(),
+ elasticsearchSink.getPassword(),
+ elasticsearchSink.getDocumentType(),
+ elasticsearchSink.getPrimaryKey(),
+ elasticsearchSink.getEsVersion(),
+ elasticsearchSink.getSinkMultipleEnable(),
+ format,
+ elasticsearchSink.getIndexPattern());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/GreenplumProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/GreenplumProvider.java
new file mode 100644
index 0000000000..df5618bce0
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/GreenplumProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Greenplum load nodes.
+ */
+public class GreenplumProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.GREENPLUM.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ GreenplumSink greenplumSink = (GreenplumSink) nodeInfo;
+ Map<String, String> properties = parseProperties(greenplumSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(greenplumSink.getSinkFieldList(), greenplumSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(greenplumSink.getSinkFieldList(), constantFieldMap);
+ return new GreenplumLoadNode(
+ greenplumSink.getSinkName(),
+ greenplumSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ greenplumSink.getJdbcUrl(),
+ greenplumSink.getUsername(),
+ greenplumSink.getPassword(),
+ greenplumSink.getTableName(),
+ greenplumSink.getPrimaryKey());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HBaseProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HBaseProvider.java
new file mode 100644
index 0000000000..b00ae617cf
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HBaseProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.hbase.HBaseSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating HBase load nodes.
+ */
+public class HBaseProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.HBASE.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ HBaseSink hbaseSink = (HBaseSink) nodeInfo;
+ Map<String, String> properties = parseProperties(hbaseSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(hbaseSink.getSinkFieldList(), hbaseSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(hbaseSink.getSinkFieldList(), constantFieldMap);
+ return new HbaseLoadNode(
+ hbaseSink.getSinkName(),
+ hbaseSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ Lists.newArrayList(),
+ null,
+ null,
+ properties,
+ hbaseSink.getTableName(),
+ hbaseSink.getNamespace(),
+ hbaseSink.getZkQuorum(),
+ hbaseSink.getRowKey(),
+ hbaseSink.getBufferFlushMaxSize(),
+ hbaseSink.getZkNodeParent(),
+ hbaseSink.getBufferFlushMaxRows(),
+ hbaseSink.getBufferFlushInterval());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HDFSProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HDFSProvider.java
new file mode 100644
index 0000000000..94d5415043
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HDFSProvider.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The Provider for creating HDFS load nodes.
+ */
+public class HDFSProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.HDFS.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ HDFSSink hdfsSink = (HDFSSink) nodeInfo;
+ Map<String, String> properties = parseProperties(hdfsSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(hdfsSink.getSinkFieldList(), hdfsSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(hdfsSink.getSinkFieldList(), constantFieldMap);
+ List<FieldInfo> partitionFields = Lists.newArrayList();
+ if (CollectionUtils.isNotEmpty(hdfsSink.getPartitionFieldList())) {
+ partitionFields = hdfsSink.getPartitionFieldList().stream()
+ .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hdfsSink.getSinkName(),
+ FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
+ partitionField.getFieldFormat())))
+ .collect(Collectors.toList());
+ }
+
+ return new FileSystemLoadNode(
+ hdfsSink.getSinkName(),
+ hdfsSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ Lists.newArrayList(),
+ hdfsSink.getDataPath(),
+ hdfsSink.getFileFormat(),
+ null,
+ properties,
+ partitionFields,
+ hdfsSink.getServerTimeZone());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HiveProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HiveProvider.java
new file mode 100644
index 0000000000..6595409def
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HiveProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The Provider for creating Hive load nodes.
+ */
+public class HiveProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.HIVE.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ HiveSink hiveSink = (HiveSink) nodeInfo;
+ Map<String, String> properties = parseProperties(hiveSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(hiveSink.getSinkFieldList(), hiveSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(hiveSink.getSinkFieldList(), constantFieldMap);
+ List<FieldInfo> partitionFields = Lists.newArrayList();
+ if (CollectionUtils.isNotEmpty(hiveSink.getPartitionFieldList())) {
+ partitionFields = hiveSink.getPartitionFieldList().stream()
+ .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hiveSink.getSinkName(),
+ FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
+ partitionField.getFieldFormat())))
+ .collect(Collectors.toList());
+ }
+ return new HiveLoadNode(
+ hiveSink.getSinkName(),
+ hiveSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ Lists.newArrayList(),
+ null,
+ null,
+ properties,
+ null,
+ hiveSink.getDbName(),
+ hiveSink.getTableName(),
+ hiveSink.getHiveConfDir(),
+ hiveSink.getHiveVersion(),
+ null,
+ partitionFields);
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HudiProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HudiProvider.java
new file mode 100644
index 0000000000..24dd749e41
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/HudiProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.source.hudi.HudiSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.HudiConstant;
+import org.apache.inlong.sort.protocol.constant.HudiConstant.CatalogType;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.extract.HudiExtractNode;
+import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Hudi extract or load nodes.
+ */
+public class HudiProvider implements ExtractNodeProvider, LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String streamType) {
+ return StreamType.HUDI.equals(streamType);
+ }
+
+ @Override
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
+ HudiSource source = (HudiSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
+ Map<String, String> properties = parseProperties(source.getProperties());
+
+ return new HudiExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ source.getCatalogUri(),
+ source.getWarehouse(),
+ source.getDbName(),
+ source.getTableName(),
+ CatalogType.HIVE,
+ source.getCheckIntervalInMinus(),
+ source.isReadStreamingSkipCompaction(),
+ source.getReadStartCommit(),
+ properties,
+ source.getExtList());
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ HudiSink hudiSink = (HudiSink) nodeInfo;
+ Map<String, String> properties = parseProperties(hudiSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(hudiSink.getSinkFieldList(), hudiSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(hudiSink.getSinkFieldList(), constantFieldMap);
+ HudiConstant.CatalogType catalogType = HudiConstant.CatalogType.forName(hudiSink.getCatalogType());
+
+ return new HudiLoadNode(
+ hudiSink.getSinkName(),
+ hudiSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ hudiSink.getDbName(),
+ hudiSink.getTableName(),
+ hudiSink.getPrimaryKey(),
+ catalogType,
+ hudiSink.getCatalogUri(),
+ hudiSink.getWarehouse(),
+ hudiSink.getExtList(),
+ hudiSink.getPartitionKey());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
new file mode 100644
index 0000000000..1df0c0f567
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/IcebergProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Iceberg load nodes.
+ */
+public class IcebergProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.ICEBERG.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ IcebergSink icebergSink = (IcebergSink) nodeInfo;
+ Map<String, String> properties = parseProperties(icebergSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(icebergSink.getSinkFieldList(), icebergSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(icebergSink.getSinkFieldList(), constantFieldMap);
+ IcebergConstant.CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType());
+
+ return new IcebergLoadNode(
+ icebergSink.getSinkName(),
+ icebergSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ icebergSink.getDbName(),
+ icebergSink.getTableName(),
+ icebergSink.getPrimaryKey(),
+ catalogType,
+ icebergSink.getCatalogUri(),
+ icebergSink.getWarehouse());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
new file mode 100644
index 0000000000..adb1d911e1
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
+import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.format.AvroFormat;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.format.RawFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Kafka extract or load nodes.
+ */
+public class KafkaProvider implements ExtractNodeProvider, LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String streamType) {
+ return StreamType.KAFKA.equals(streamType);
+ }
+
+ @Override
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
+ KafkaSource kafkaSource = (KafkaSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos = parseStreamFieldInfos(kafkaSource.getFieldList(), kafkaSource.getSourceName());
+ Map<String, String> properties = parseProperties(kafkaSource.getProperties());
+
+ String topic = kafkaSource.getTopic();
+ String bootstrapServers = kafkaSource.getBootstrapServers();
+
+ Format format = parsingFormat(
+ kafkaSource.getSerializationType(),
+ kafkaSource.isWrapWithInlongMsg(),
+ kafkaSource.getDataSeparator(),
+ kafkaSource.isIgnoreParseErrors());
+
+ KafkaOffset kafkaOffset = KafkaOffset.forName(kafkaSource.getAutoOffsetReset());
+ KafkaScanStartupMode startupMode;
+ switch (kafkaOffset) {
+ case EARLIEST:
+ startupMode = KafkaScanStartupMode.EARLIEST_OFFSET;
+ break;
+ case SPECIFIC:
+ startupMode = KafkaScanStartupMode.SPECIFIC_OFFSETS;
+ break;
+ case TIMESTAMP_MILLIS:
+ startupMode = KafkaScanStartupMode.TIMESTAMP_MILLIS;
+ break;
+ case LATEST:
+ default:
+ startupMode = KafkaScanStartupMode.LATEST_OFFSET;
+ }
+ final String primaryKey = kafkaSource.getPrimaryKey();
+ String groupId = kafkaSource.getGroupId();
+ String partitionOffset = kafkaSource.getPartitionOffsets();
+ String scanTimestampMillis = kafkaSource.getTimestampMillis();
+ return new KafkaExtractNode(kafkaSource.getSourceName(),
+ kafkaSource.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ topic,
+ bootstrapServers,
+ format,
+ startupMode,
+ primaryKey,
+ groupId,
+ partitionOffset,
+ scanTimestampMillis);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ KafkaSink kafkaSink = (KafkaSink) nodeInfo;
+ Map<String, String> properties = parseProperties(kafkaSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(kafkaSink.getSinkFieldList(), kafkaSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(kafkaSink.getSinkFieldList(), constantFieldMap);
+ Integer sinkParallelism = null;
+ if (StringUtils.isNotEmpty(kafkaSink.getPartitionNum())) {
+ sinkParallelism = Integer.parseInt(kafkaSink.getPartitionNum());
+ }
+ DataTypeEnum dataType = DataTypeEnum.forType(kafkaSink.getSerializationType());
+ Format format;
+ switch (dataType) {
+ case CSV:
+ format = new CsvFormat();
+ break;
+ case AVRO:
+ format = new AvroFormat();
+ break;
+ case JSON:
+ format = new JsonFormat();
+ break;
+ case CANAL:
+ format = new CanalJsonFormat();
+ break;
+ case DEBEZIUM_JSON:
+ format = new DebeziumJsonFormat();
+ break;
+ case RAW:
+ format = new RawFormat();
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
+ }
+
+ return new KafkaLoadNode(
+ kafkaSink.getSinkName(),
+ kafkaSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ Lists.newArrayList(),
+ null,
+ kafkaSink.getTopicName(),
+ kafkaSink.getBootstrapServers(),
+ format,
+ sinkParallelism,
+ properties,
+ kafkaSink.getPrimaryKey());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KuduProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KuduProvider.java
new file mode 100644
index 0000000000..3ddd3e5e40
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KuduProvider.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.kudu.KuduSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.KuduLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Kudu load nodes.
+ */
+public class KuduProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.KUDU.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ KuduSink kuduSink = (KuduSink) nodeInfo;
+ Map<String, String> properties = parseProperties(kuduSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(kuduSink.getSinkFieldList(), kuduSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(kuduSink.getSinkFieldList(), constantFieldMap);
+
+ return new KuduLoadNode(
+ kuduSink.getSinkName(),
+ kuduSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ kuduSink.getMasters(),
+ kuduSink.getTableName(),
+ kuduSink.getPartitionKey());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MongoProvider.java
similarity index 86%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MongoProvider.java
index 72013e2386..2b87c75221 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MongoProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MongoProvider.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.source.mongodb.MongoDBSource;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
@@ -39,9 +39,9 @@ public class MongoProvider implements ExtractNodeProvider {
}
@Override
- public ExtractNode createNode(StreamNode streamNodeInfo) {
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
MongoDBSource source = (MongoDBSource) streamNodeInfo;
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+ List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
Map<String, String> properties = parseProperties(source.getProperties());
return new MongoExtractNode(
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
similarity index 87%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
index 9732a8d277..a95fc99008 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/MysqlBinlogProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLBinlogProvider.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
@@ -31,9 +31,9 @@ import java.util.List;
import java.util.Map;
/**
- * The Provider for creating MysqlBinlog extract nodes.
+ * The Provider for creating MySQLBinlog extract nodes.
*/
-public class MysqlBinlogProvider implements ExtractNodeProvider {
+public class MySQLBinlogProvider implements ExtractNodeProvider {
@Override
public Boolean accept(String sourceType) {
@@ -41,9 +41,9 @@ public class MysqlBinlogProvider implements ExtractNodeProvider {
}
@Override
- public ExtractNode createNode(StreamNode streamNodeInfo) {
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
MySQLBinlogSource binlogSource = (MySQLBinlogSource) streamNodeInfo;
- List<FieldInfo> fieldInfos = parseFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
+ List<FieldInfo> fieldInfos = parseStreamFieldInfos(binlogSource.getFieldList(), binlogSource.getSourceName());
Map<String, String> properties = parseProperties(binlogSource.getProperties());
final String database = binlogSource.getDatabaseWhiteList();
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLProvider.java
new file mode 100644
index 0000000000..8a46d4bacb
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/MySQLProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
+import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating MySQL load nodes.
+ */
+public class MySQLProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.MYSQL.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ MySQLSink mysqlSink = (MySQLSink) nodeInfo;
+ Map<String, String> properties = parseProperties(mysqlSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(mysqlSink.getSinkFieldList(), mysqlSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(mysqlSink.getSinkFieldList(), constantFieldMap);
+
+ return new MySqlLoadNode(
+ mysqlSink.getSinkName(),
+ mysqlSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ Lists.newArrayList(),
+ null,
+ null,
+ properties,
+ MySQLSinkDTO.setDbNameToUrl(mysqlSink.getJdbcUrl(), mysqlSink.getDatabaseName()),
+ mysqlSink.getUsername(),
+ mysqlSink.getPassword(),
+ mysqlSink.getTableName(),
+ mysqlSink.getPrimaryKey());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OracleProvider.java
similarity index 52%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OracleProvider.java
index 72993b723a..12643e0370 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/OracleProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OracleProvider.java
@@ -15,16 +15,22 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
import org.apache.inlong.manager.pojo.source.oracle.OracleSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
+import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import org.apache.commons.lang3.StringUtils;
@@ -32,19 +38,19 @@ import java.util.List;
import java.util.Map;
/**
- * The Provider for creating Oracle extract nodes.
+ * The Provider for creating Oracle extract or load nodes.
*/
-public class OracleProvider implements ExtractNodeProvider {
+public class OracleProvider implements ExtractNodeProvider, LoadNodeProvider {
@Override
- public Boolean accept(String sourceType) {
- return SourceType.ORACLE.equals(sourceType);
+ public Boolean accept(String streamType) {
+ return StreamType.ORACLE.equals(streamType);
}
@Override
- public ExtractNode createNode(StreamNode streamNodeInfo) {
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
OracleSource source = (OracleSource) streamNodeInfo;
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+ List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
Map<String, String> properties = parseProperties(source.getProperties());
ScanStartUpMode scanStartupMode = StringUtils.isBlank(source.getScanStartupMode())
@@ -66,4 +72,27 @@ public class OracleProvider implements ExtractNodeProvider {
source.getPort(),
scanStartupMode);
}
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ OracleSink oracleSink = (OracleSink) nodeInfo;
+ Map<String, String> properties = parseProperties(oracleSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(oracleSink.getSinkFieldList(), oracleSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(oracleSink.getSinkFieldList(), constantFieldMap);
+
+ return new OracleLoadNode(
+ oracleSink.getSinkName(),
+ oracleSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ oracleSink.getJdbcUrl(),
+ oracleSink.getUsername(),
+ oracleSink.getPassword(),
+ oracleSink.getTableName(),
+ oracleSink.getPrimaryKey());
+ }
}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
similarity index 50%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
index 128c30018e..0f16379d4b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PostgreSqlProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PostgreSQLProvider.java
@@ -15,33 +15,40 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
-import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
import java.util.List;
import java.util.Map;
/**
- * The Provider for creating PostgreSql extract nodes.
+ * The Provider for creating PostgreSQL extract or load nodes.
*/
-public class PostgreSqlProvider implements ExtractNodeProvider {
+public class PostgreSQLProvider implements ExtractNodeProvider, LoadNodeProvider {
@Override
- public Boolean accept(String sourceType) {
- return SourceType.POSTGRESQL.equals(sourceType);
+ public Boolean accept(String streamType) {
+ return StreamType.POSTGRESQL.equals(streamType);
}
@Override
- public ExtractNode createNode(StreamNode streamNodeInfo) {
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
PostgreSQLSource postgreSQLSource = (PostgreSQLSource) streamNodeInfo;
- List<FieldInfo> fieldInfos = parseFieldInfos(postgreSQLSource.getFieldList(), postgreSQLSource.getSourceName());
+ List<FieldInfo> fieldInfos = parseStreamFieldInfos(postgreSQLSource.getFieldList(),
+ postgreSQLSource.getSourceName());
Map<String, String> properties = parseProperties(postgreSQLSource.getProperties());
return new PostgresExtractNode(postgreSQLSource.getSourceName(),
@@ -61,4 +68,28 @@ public class PostgreSqlProvider implements ExtractNodeProvider {
postgreSQLSource.getServerTimeZone(),
postgreSQLSource.getScanStartupMode());
}
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ PostgreSQLSink postgreSQLSink = (PostgreSQLSink) nodeInfo;
+ Map<String, String> properties = parseProperties(postgreSQLSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(postgreSQLSink.getSinkFieldList(),
+ postgreSQLSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(postgreSQLSink.getSinkFieldList(), constantFieldMap);
+
+ return new PostgresLoadNode(
+ postgreSQLSink.getSinkName(),
+ postgreSQLSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ postgreSQLSink.getJdbcUrl(),
+ postgreSQLSink.getUsername(),
+ postgreSQLSink.getPassword(),
+ postgreSQLSink.getDbName() + "." + postgreSQLSink.getTableName(),
+ postgreSQLSink.getPrimaryKey());
+ }
}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
similarity index 90%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index 14cde851b7..da20e8b203 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/PulsarProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
@@ -43,9 +43,9 @@ public class PulsarProvider implements ExtractNodeProvider {
}
@Override
- public ExtractNode createNode(StreamNode streamNodeInfo) {
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
PulsarSource pulsarSource = (PulsarSource) streamNodeInfo;
- List<FieldInfo> fieldInfos = parseFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
+ List<FieldInfo> fieldInfos = parseStreamFieldInfos(pulsarSource.getFieldList(), pulsarSource.getSourceName());
Map<String, String> properties = parseProperties(pulsarSource.getProperties());
String fullTopicName =
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/RedisProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/RedisProvider.java
new file mode 100644
index 0000000000..8bc083007a
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/RedisProvider.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.source.redis.RedisLookupOptions;
+import org.apache.inlong.manager.pojo.source.redis.RedisSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.LookupOptions;
+import org.apache.inlong.sort.protocol.enums.RedisCommand;
+import org.apache.inlong.sort.protocol.enums.RedisMode;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode;
+import org.apache.inlong.sort.protocol.node.format.AvroFormat;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.format.RawFormat;
+import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating Redis extract or load nodes.
+ */
+public class RedisProvider implements ExtractNodeProvider, LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sourceType) {
+ return SourceType.REDIS.equals(sourceType);
+ }
+
+ @Override
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
+ RedisSource source = (RedisSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
+ Map<String, String> properties = parseProperties(source.getProperties());
+
+ RedisMode redisMode = RedisMode.forName(source.getRedisMode());
+ switch (redisMode) {
+ case STANDALONE:
+ return new RedisExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ RedisCommand.forName(source.getCommand()),
+ source.getHost(),
+ source.getPort(),
+ source.getPassword(),
+ source.getAdditionalKey(),
+ source.getDatabase(),
+ source.getTimeout(),
+ source.getSoTimeout(),
+ source.getMaxTotal(),
+ source.getMaxIdle(),
+ source.getMinIdle(),
+ parseLookupOptions(source.getLookupOptions()));
+ case SENTINEL:
+ return new RedisExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ RedisCommand.forName(source.getCommand()),
+ source.getMasterName(),
+ source.getSentinelsInfo(),
+ source.getPassword(),
+ source.getAdditionalKey(),
+ source.getDatabase(),
+ source.getTimeout(),
+ source.getSoTimeout(),
+ source.getMaxTotal(),
+ source.getMaxIdle(),
+ source.getMinIdle(),
+ parseLookupOptions(source.getLookupOptions()));
+ case CLUSTER:
+ return new RedisExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ RedisCommand.forName(source.getCommand()),
+ source.getClusterNodes(),
+ source.getPassword(),
+ source.getAdditionalKey(),
+ source.getDatabase(),
+ source.getTimeout(),
+ source.getSoTimeout(),
+ source.getMaxTotal(),
+ source.getMaxIdle(),
+ source.getMinIdle(),
+ parseLookupOptions(source.getLookupOptions()));
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported redis-mode=%s for Inlong", redisMode));
+ }
+ }
+
+ /**
+ * Parse LookupOptions
+ *
+ * @param options RedisLookupOptions
+ * @return LookupOptions
+ */
+ private static LookupOptions parseLookupOptions(RedisLookupOptions options) {
+ if (options == null) {
+ return null;
+ }
+ return new LookupOptions(options.getLookupCacheMaxRows(), options.getLookupCacheTtl(),
+ options.getLookupMaxRetries(), options.getLookupAsync());
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ RedisSink redisSink = (RedisSink) nodeInfo;
+ Map<String, String> properties = parseProperties(redisSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(redisSink.getSinkFieldList(), redisSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(redisSink.getSinkFieldList(), constantFieldMap);
+
+ String clusterMode = redisSink.getClusterMode();
+ String dataType = redisSink.getDataType();
+ String schemaMapMode = redisSink.getSchemaMapMode();
+ String host = redisSink.getHost();
+ Integer port = redisSink.getPort();
+ String clusterNodes = redisSink.getClusterNodes();
+ String masterName = redisSink.getMasterName();
+ String sentinelsInfo = redisSink.getSentinelsInfo();
+ Integer database = redisSink.getDatabase();
+ String password = redisSink.getPassword();
+ Integer ttl = redisSink.getTtl();
+ Integer timeout = redisSink.getTimeout();
+ Integer soTimeout = redisSink.getSoTimeout();
+ Integer maxTotal = redisSink.getMaxTotal();
+ Integer maxIdle = redisSink.getMaxIdle();
+ Integer minIdle = redisSink.getMinIdle();
+ Integer maxRetries = redisSink.getMaxRetries();
+
+ Format format = parsingDataFormat(
+ redisSink.getFormatDataType(),
+ false,
+ redisSink.getFormatDataSeparator(),
+ false);
+
+ return new RedisLoadNode(
+ redisSink.getSinkName(),
+ redisSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ clusterMode,
+ dataType,
+ schemaMapMode,
+ host,
+ port,
+ clusterNodes,
+ masterName,
+ sentinelsInfo,
+ database,
+ password,
+ ttl,
+ format,
+ timeout,
+ soTimeout,
+ maxTotal,
+ maxIdle,
+ minIdle,
+ maxRetries);
+ }
+
+ /**
+ * Parse format
+ *
+ * @param formatName data serialization, support: csv, json, canal, avro, etc
+ * @param wrapWithInlongMsg whether wrap content with {@link InLongMsgFormat}
+ * @param separatorStr the separator of data content
+ * @param ignoreParseErrors whether ignore deserialization error data
+ * @return the format for serialized content
+ */
+ private Format parsingDataFormat(
+ String formatName,
+ boolean wrapWithInlongMsg,
+ String separatorStr,
+ boolean ignoreParseErrors) {
+ Format format;
+ DataTypeEnum dataType = DataTypeEnum.forType(formatName);
+ switch (dataType) {
+ case CSV:
+ if (StringUtils.isNumeric(separatorStr)) {
+ char dataSeparator = (char) Integer.parseInt(separatorStr);
+ separatorStr = Character.toString(dataSeparator);
+ }
+ CsvFormat csvFormat = new CsvFormat(separatorStr);
+ csvFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = csvFormat;
+ break;
+ case AVRO:
+ format = new AvroFormat();
+ break;
+ case JSON:
+ JsonFormat jsonFormat = new JsonFormat();
+ jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = jsonFormat;
+ break;
+ case CANAL:
+ format = new CanalJsonFormat();
+ break;
+ case DEBEZIUM_JSON:
+ DebeziumJsonFormat debeziumJsonFormat = new DebeziumJsonFormat();
+ debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
+ format = debeziumJsonFormat;
+ break;
+ case RAW:
+ format = new RawFormat();
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported dataType=%s", dataType));
+ }
+ if (wrapWithInlongMsg) {
+ Format innerFormat = format;
+ format = new InLongMsgFormat(innerFormat, false);
+ }
+ return format;
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/SQLServerProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/SQLServerProvider.java
new file mode 100644
index 0000000000..27aa06c92d
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/SQLServerProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.StreamType;
+import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSink;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
+import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating SQLServer extract or load nodes.
+ */
+public class SQLServerProvider implements ExtractNodeProvider, LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String streamType) {
+ return StreamType.SQLSERVER.equals(streamType);
+ }
+
+ @Override
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
+ SQLServerSource source = (SQLServerSource) streamNodeInfo;
+ List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
+ Map<String, String> properties = parseProperties(source.getProperties());
+
+ return new SqlServerExtractNode(
+ source.getSourceName(),
+ source.getSourceName(),
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ source.getHostname(),
+ source.getPort(),
+ source.getUsername(),
+ source.getPassword(),
+ source.getDatabase(),
+ source.getSchemaName(),
+ source.getTableName(),
+ source.getServerTimezone());
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ SQLServerSink sqlServerSink = (SQLServerSink) nodeInfo;
+ Map<String, String> properties = parseProperties(sqlServerSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(sqlServerSink.getSinkFieldList(), sqlServerSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(sqlServerSink.getSinkFieldList(), constantFieldMap);
+
+ return new SqlServerLoadNode(
+ sqlServerSink.getSinkName(),
+ sqlServerSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ sqlServerSink.getJdbcUrl(),
+ sqlServerSink.getUsername(),
+ sqlServerSink.getPassword(),
+ sqlServerSink.getSchemaName(),
+ sqlServerSink.getTableName(),
+ sqlServerSink.getPrimaryKey());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
new file mode 100644
index 0000000000..588671d370
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/StarRocksProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating StarRocks load nodes.
+ */
+public class StarRocksProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.STARROCKS.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ StarRocksSink starRocksSink = (StarRocksSink) nodeInfo;
+ Map<String, String> properties = parseProperties(starRocksSink.getProperties());
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(starRocksSink.getSinkFieldList(), starRocksSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(starRocksSink.getSinkFieldList(), constantFieldMap);
+
+ Format format = null;
+ if (Boolean.TRUE.equals(starRocksSink.getSinkMultipleEnable())
+ && StringUtils.isNotBlank(starRocksSink.getSinkMultipleFormat())) {
+ DataTypeEnum dataType = DataTypeEnum.forType(starRocksSink.getSinkMultipleFormat());
+ switch (dataType) {
+ case CANAL:
+ format = new CanalJsonFormat();
+ break;
+ case DEBEZIUM_JSON:
+ format = new DebeziumJsonFormat();
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported dataType=%s for StarRocks", dataType));
+ }
+ }
+ return new StarRocksLoadNode(
+ starRocksSink.getSinkName(),
+ starRocksSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ starRocksSink.getJdbcUrl(),
+ starRocksSink.getLoadUrl(),
+ starRocksSink.getUsername(),
+ starRocksSink.getPassword(),
+ starRocksSink.getDatabaseName(),
+ starRocksSink.getTableName(),
+ starRocksSink.getPrimaryKey(),
+ starRocksSink.getSinkMultipleEnable(),
+ format,
+ starRocksSink.getDatabasePattern(),
+ starRocksSink.getTablePattern());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TDSQLPostgreSQLProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TDSQLPostgreSQLProvider.java
new file mode 100644
index 0000000000..0f9f797199
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TDSQLPostgreSQLProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating TDSQLPostgreSQL load nodes.
+ */
+public class TDSQLPostgreSQLProvider implements LoadNodeProvider {
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.TDSQLPOSTGRESQL.equals(sinkType);
+ }
+
+ @Override
+ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) {
+ TDSQLPostgreSQLSink tdsqlPostgreSQLSink = (TDSQLPostgreSQLSink) nodeInfo;
+ Map<String, String> properties = parseProperties(tdsqlPostgreSQLSink.getProperties());
+ List<SinkField> sinkFieldList = tdsqlPostgreSQLSink.getSinkFieldList();
+ List<FieldInfo> fieldInfos = parseSinkFieldInfos(sinkFieldList, tdsqlPostgreSQLSink.getSinkName());
+ List<FieldRelation> fieldRelations = parseSinkFields(sinkFieldList, constantFieldMap);
+
+ return new TDSQLPostgresLoadNode(
+ tdsqlPostgreSQLSink.getSinkName(),
+ tdsqlPostgreSQLSink.getSinkName(),
+ fieldInfos,
+ fieldRelations,
+ null,
+ null,
+ null,
+ properties,
+ tdsqlPostgreSQLSink.getJdbcUrl(),
+ tdsqlPostgreSQLSink.getUsername(),
+ tdsqlPostgreSQLSink.getPassword(),
+ tdsqlPostgreSQLSink.getSchemaName() + "." + tdsqlPostgreSQLSink.getTableName(),
+ tdsqlPostgreSQLSink.getPrimaryKey());
+ }
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
similarity index 86%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index 2a61b385a1..72399221f8 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/extract/TubeMqProvider.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.pojo.sort.node.extract;
+package org.apache.inlong.manager.pojo.sort.node.provider;
import org.apache.inlong.manager.common.consts.SourceType;
-import org.apache.inlong.manager.pojo.sort.node.ExtractNodeProvider;
+import org.apache.inlong.manager.pojo.sort.node.base.ExtractNodeProvider;
import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
import org.apache.inlong.manager.pojo.stream.StreamNode;
import org.apache.inlong.sort.protocol.FieldInfo;
@@ -39,9 +39,9 @@ public class TubeMqProvider implements ExtractNodeProvider {
}
@Override
- public ExtractNode createNode(StreamNode streamNodeInfo) {
+ public ExtractNode createExtractNode(StreamNode streamNodeInfo) {
TubeMQSource source = (TubeMQSource) streamNodeInfo;
- List<FieldInfo> fieldInfos = parseFieldInfos(source.getFieldList(), source.getSourceName());
+ List<FieldInfo> fieldInfos = parseStreamFieldInfos(source.getFieldList(), source.getSourceName());
Map<String, String> properties = parseProperties(source.getProperties());
return new TubeMQExtractNode(
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
deleted file mode 100644
index 9209d19b5b..0000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ /dev/null
@@ -1,822 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sort.util;
-
-import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.consts.SinkType;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.FieldType;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.sink.SinkField;
-import org.apache.inlong.manager.pojo.sink.StreamSink;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
-import org.apache.inlong.manager.pojo.sink.doris.DorisSink;
-import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
-import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSink;
-import org.apache.inlong.manager.pojo.sink.hbase.HBaseSink;
-import org.apache.inlong.manager.pojo.sink.hdfs.HDFSSink;
-import org.apache.inlong.manager.pojo.sink.hive.HivePartitionField;
-import org.apache.inlong.manager.pojo.sink.hive.HiveSink;
-import org.apache.inlong.manager.pojo.sink.hudi.HudiSink;
-import org.apache.inlong.manager.pojo.sink.iceberg.IcebergSink;
-import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
-import org.apache.inlong.manager.pojo.sink.kudu.KuduSink;
-import org.apache.inlong.manager.pojo.sink.mysql.MySQLSink;
-import org.apache.inlong.manager.pojo.sink.mysql.MySQLSinkDTO;
-import org.apache.inlong.manager.pojo.sink.oracle.OracleSink;
-import org.apache.inlong.manager.pojo.sink.postgresql.PostgreSQLSink;
-import org.apache.inlong.manager.pojo.sink.redis.RedisSink;
-import org.apache.inlong.manager.pojo.sink.sqlserver.SQLServerSink;
-import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
-import org.apache.inlong.manager.pojo.sink.tdsqlpostgresql.TDSQLPostgreSQLSink;
-import org.apache.inlong.manager.pojo.stream.StreamField;
-import org.apache.inlong.sort.formats.common.StringTypeInfo;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.constant.HudiConstant;
-import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
-import org.apache.inlong.sort.protocol.node.LoadNode;
-import org.apache.inlong.sort.protocol.node.format.AvroFormat;
-import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.CsvFormat;
-import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
-import org.apache.inlong.sort.protocol.node.format.Format;
-import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
-import org.apache.inlong.sort.protocol.node.format.JsonFormat;
-import org.apache.inlong.sort.protocol.node.format.RawFormat;
-import org.apache.inlong.sort.protocol.node.load.ClickHouseLoadNode;
-import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
-import org.apache.inlong.sort.protocol.node.load.ElasticsearchLoadNode;
-import org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
-import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
-import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
-import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
-import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
-import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
-import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
-import org.apache.inlong.sort.protocol.node.load.KuduLoadNode;
-import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
-import org.apache.inlong.sort.protocol.node.load.OracleLoadNode;
-import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
-import org.apache.inlong.sort.protocol.node.load.RedisLoadNode;
-import org.apache.inlong.sort.protocol.node.load.SqlServerLoadNode;
-import org.apache.inlong.sort.protocol.node.load.StarRocksLoadNode;
-import org.apache.inlong.sort.protocol.node.load.TDSQLPostgresLoadNode;
-import org.apache.inlong.sort.protocol.transformation.ConstantParam;
-import org.apache.inlong.sort.protocol.transformation.FieldRelation;
-import org.apache.inlong.sort.protocol.transformation.FunctionParam;
-import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
-import org.apache.inlong.sort.protocol.transformation.function.CustomFunction;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Util for load node info.
- */
-public class LoadNodeUtils {
-
- /**
- * Create nodes of data load.
- */
- public static List<LoadNode> createLoadNodes(List<StreamSink> streamSinks, Map<String, StreamField> fieldMap) {
- if (CollectionUtils.isEmpty(streamSinks)) {
- return Lists.newArrayList();
- }
- return streamSinks.stream()
- .map(sink -> LoadNodeUtils.createLoadNode(sink, fieldMap))
- .collect(Collectors.toList());
- }
-
- /**
- * Create load node from the stream sink info.
- */
- public static LoadNode createLoadNode(StreamSink streamSink, Map<String, StreamField> constantFieldMap) {
- List<FieldInfo> fieldInfos = streamSink.getSinkFieldList().stream()
- .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, streamSink.getSinkName()))
- .collect(Collectors.toList());
- List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(), constantFieldMap);
- Map<String, String> properties = streamSink.getProperties().entrySet().stream()
- .filter(v -> Objects.nonNull(v.getValue()))
- .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
- String sinkType = streamSink.getSinkType();
- switch (sinkType) {
- case SinkType.KAFKA:
- return createLoadNode((KafkaSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.HIVE:
- return createLoadNode((HiveSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.HBASE:
- return createLoadNode((HBaseSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.POSTGRESQL:
- return createLoadNode((PostgreSQLSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.CLICKHOUSE:
- return createLoadNode((ClickHouseSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.ICEBERG:
- return createLoadNode((IcebergSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.HUDI:
- return createLoadNode((HudiSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.SQLSERVER:
- return createLoadNode((SQLServerSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.ELASTICSEARCH:
- return createLoadNode((ElasticsearchSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.HDFS:
- return createLoadNode((HDFSSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.GREENPLUM:
- return createLoadNode((GreenplumSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.MYSQL:
- return createLoadNode((MySQLSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.ORACLE:
- return createLoadNode((OracleSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.TDSQLPOSTGRESQL:
- return createLoadNode((TDSQLPostgreSQLSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.DORIS:
- return createLoadNode((DorisSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.STARROCKS:
- return createLoadNode((StarRocksSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.KUDU:
- return createLoadNode((KuduSink) streamSink, fieldInfos, fieldRelations, properties);
- case SinkType.REDIS:
- return createLoadNode((RedisSink) streamSink, fieldInfos, fieldRelations, properties);
- default:
- throw new BusinessException(String.format("Unsupported sinkType=%s to create load node", sinkType));
- }
- }
-
- /**
- * Create load node of Kafka.
- */
- public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- Integer sinkParallelism = null;
- if (StringUtils.isNotEmpty(kafkaSink.getPartitionNum())) {
- sinkParallelism = Integer.parseInt(kafkaSink.getPartitionNum());
- }
- DataTypeEnum dataType = DataTypeEnum.forType(kafkaSink.getSerializationType());
- Format format;
- switch (dataType) {
- case CSV:
- format = new CsvFormat();
- break;
- case AVRO:
- format = new AvroFormat();
- break;
- case JSON:
- format = new JsonFormat();
- break;
- case CANAL:
- format = new CanalJsonFormat();
- break;
- case DEBEZIUM_JSON:
- format = new DebeziumJsonFormat();
- break;
- case RAW:
- format = new RawFormat();
- break;
- default:
- throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType));
- }
-
- return new KafkaLoadNode(
- kafkaSink.getSinkName(),
- kafkaSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- Lists.newArrayList(),
- null,
- kafkaSink.getTopicName(),
- kafkaSink.getBootstrapServers(),
- format,
- sinkParallelism,
- properties,
- kafkaSink.getPrimaryKey());
- }
-
- /**
- * Create load node of Hive.
- */
- public static HiveLoadNode createLoadNode(HiveSink hiveSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- List<FieldInfo> partitionFields = Lists.newArrayList();
- if (CollectionUtils.isNotEmpty(hiveSink.getPartitionFieldList())) {
- partitionFields = hiveSink.getPartitionFieldList().stream()
- .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hiveSink.getSinkName(),
- FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
- partitionField.getFieldFormat())))
- .collect(Collectors.toList());
- }
- return new HiveLoadNode(
- hiveSink.getSinkName(),
- hiveSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- Lists.newArrayList(),
- null,
- null,
- properties,
- null,
- hiveSink.getDbName(),
- hiveSink.getTableName(),
- hiveSink.getHiveConfDir(),
- hiveSink.getHiveVersion(),
- null,
- partitionFields);
- }
-
- /**
- * Create load node of HBase.
- */
- public static HbaseLoadNode createLoadNode(HBaseSink hbaseSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- return new HbaseLoadNode(
- hbaseSink.getSinkName(),
- hbaseSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- Lists.newArrayList(),
- null,
- null,
- properties,
- hbaseSink.getTableName(),
- hbaseSink.getNamespace(),
- hbaseSink.getZkQuorum(),
- hbaseSink.getRowKey(),
- hbaseSink.getBufferFlushMaxSize(),
- hbaseSink.getZkNodeParent(),
- hbaseSink.getBufferFlushMaxRows(),
- hbaseSink.getBufferFlushInterval());
- }
-
- /**
- * Create load node of PostgreSQL.
- */
- public static PostgresLoadNode createLoadNode(PostgreSQLSink postgreSQLSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- return new PostgresLoadNode(
- postgreSQLSink.getSinkName(),
- postgreSQLSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- postgreSQLSink.getJdbcUrl(),
- postgreSQLSink.getUsername(),
- postgreSQLSink.getPassword(),
- postgreSQLSink.getDbName() + "." + postgreSQLSink.getTableName(),
- postgreSQLSink.getPrimaryKey());
- }
-
- /**
- * Create load node of ClickHouse.
- */
- public static ClickHouseLoadNode createLoadNode(ClickHouseSink ckSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- return new ClickHouseLoadNode(
- ckSink.getSinkName(),
- ckSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- ckSink.getDbName() + "." + ckSink.getTableName(),
- ckSink.getJdbcUrl() + "/" + ckSink.getDbName(),
- ckSink.getUsername(),
- ckSink.getPassword(),
- ckSink.getPrimaryKey());
- }
-
- /**
- * Create load node of Doris.
- */
- public static DorisLoadNode createLoadNode(DorisSink dorisSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- Format format = null;
- if (dorisSink.getSinkMultipleEnable() != null && dorisSink.getSinkMultipleEnable() && StringUtils.isNotBlank(
- dorisSink.getSinkMultipleFormat())) {
- DataTypeEnum dataType = DataTypeEnum.forType(dorisSink.getSinkMultipleFormat());
- switch (dataType) {
- case CANAL:
- format = new CanalJsonFormat();
- break;
- case DEBEZIUM_JSON:
- format = new DebeziumJsonFormat();
- break;
- default:
- throw new IllegalArgumentException(String.format("Unsupported dataType=%s for doris", dataType));
- }
- }
- return new DorisLoadNode(
- dorisSink.getSinkName(),
- dorisSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- dorisSink.getFeNodes(),
- dorisSink.getUsername(),
- dorisSink.getPassword(),
- dorisSink.getTableIdentifier(),
- null,
- dorisSink.getSinkMultipleEnable(),
- format,
- dorisSink.getDatabasePattern(),
- dorisSink.getTablePattern());
- }
-
- /**
- * Create load node of StarRocks.
- */
- public static StarRocksLoadNode createLoadNode(StarRocksSink starRocksSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- Format format = null;
- if (starRocksSink.getSinkMultipleEnable() != null && starRocksSink.getSinkMultipleEnable()
- && StringUtils.isNotBlank(
- starRocksSink.getSinkMultipleFormat())) {
- DataTypeEnum dataType = DataTypeEnum.forType(starRocksSink.getSinkMultipleFormat());
- switch (dataType) {
- case CANAL:
- format = new CanalJsonFormat();
- break;
- case DEBEZIUM_JSON:
- format = new DebeziumJsonFormat();
- break;
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported dataType=%s for StarRocks", dataType));
- }
- }
- return new StarRocksLoadNode(
- starRocksSink.getSinkName(),
- starRocksSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- starRocksSink.getJdbcUrl(),
- starRocksSink.getLoadUrl(),
- starRocksSink.getUsername(),
- starRocksSink.getPassword(),
- starRocksSink.getDatabaseName(),
- starRocksSink.getTableName(),
- starRocksSink.getPrimaryKey(),
- starRocksSink.getSinkMultipleEnable(),
- format,
- starRocksSink.getDatabasePattern(),
- starRocksSink.getTablePattern());
- }
-
- /**
- * Create load node of Kudu.
- */
- public static KuduLoadNode createLoadNode(
- KuduSink kuduSink,
- List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations,
- Map<String, String> properties) {
- return new KuduLoadNode(
- kuduSink.getSinkName(),
- kuduSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- kuduSink.getMasters(),
- kuduSink.getTableName(),
- kuduSink.getPartitionKey());
- }
-
- private static LoadNode createLoadNode(
- RedisSink redisSink,
- List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations,
- Map<String, String> properties) {
- String clusterMode = redisSink.getClusterMode();
- String dataType = redisSink.getDataType();
- String schemaMapMode = redisSink.getSchemaMapMode();
- String host = redisSink.getHost();
- Integer port = redisSink.getPort();
- String clusterNodes = redisSink.getClusterNodes();
- String masterName = redisSink.getMasterName();
- String sentinelsInfo = redisSink.getSentinelsInfo();
- Integer database = redisSink.getDatabase();
- String password = redisSink.getPassword();
- Integer ttl = redisSink.getTtl();
- Integer timeout = redisSink.getTimeout();
- Integer soTimeout = redisSink.getSoTimeout();
- Integer maxTotal = redisSink.getMaxTotal();
- Integer maxIdle = redisSink.getMaxIdle();
- Integer minIdle = redisSink.getMinIdle();
- Integer maxRetries = redisSink.getMaxRetries();
-
- Format format = parsingFormat(
- redisSink.getFormatDataType(),
- false,
- redisSink.getFormatDataSeparator(),
- false);
-
- return new RedisLoadNode(
- redisSink.getSinkName(),
- redisSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- clusterMode,
- dataType,
- schemaMapMode,
- host,
- port,
- clusterNodes,
- masterName,
- sentinelsInfo,
- database,
- password,
- ttl,
- format,
- timeout,
- soTimeout,
- maxTotal,
- maxIdle,
- minIdle,
- maxRetries);
- }
-
- /**
- * Create load node of Iceberg.
- */
- public static IcebergLoadNode createLoadNode(IcebergSink icebergSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- CatalogType catalogType = CatalogType.forName(icebergSink.getCatalogType());
- return new IcebergLoadNode(
- icebergSink.getSinkName(),
- icebergSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- icebergSink.getDbName(),
- icebergSink.getTableName(),
- icebergSink.getPrimaryKey(),
- catalogType,
- icebergSink.getCatalogUri(),
- icebergSink.getWarehouse());
- }
-
- /**
- * Create load node of Hudi.
- */
- public static HudiLoadNode createLoadNode(HudiSink hudiSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- HudiConstant.CatalogType catalogType = HudiConstant.CatalogType.forName(hudiSink.getCatalogType());
-
- return new HudiLoadNode(
- hudiSink.getSinkName(),
- hudiSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- hudiSink.getDbName(),
- hudiSink.getTableName(),
- hudiSink.getPrimaryKey(),
- catalogType,
- hudiSink.getCatalogUri(),
- hudiSink.getWarehouse(),
- hudiSink.getExtList(),
- hudiSink.getPartitionKey());
- }
-
- /**
- * Create load node of SQLServer.
- */
- public static SqlServerLoadNode createLoadNode(SQLServerSink sqlServerSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- return new SqlServerLoadNode(
- sqlServerSink.getSinkName(),
- sqlServerSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- sqlServerSink.getJdbcUrl(),
- sqlServerSink.getUsername(),
- sqlServerSink.getPassword(),
- sqlServerSink.getSchemaName(),
- sqlServerSink.getTableName(),
- sqlServerSink.getPrimaryKey());
- }
-
- /**
- * Create Elasticsearch load node
- */
- public static ElasticsearchLoadNode createLoadNode(ElasticsearchSink elasticsearchSink,
- List<FieldInfo> fieldInfos, List<FieldRelation> fieldRelations, Map<String, String> properties) {
- Format format = null;
- if (elasticsearchSink.getSinkMultipleEnable() != null && elasticsearchSink.getSinkMultipleEnable()
- && StringUtils.isNotBlank(
- elasticsearchSink.getSinkMultipleFormat())) {
- DataTypeEnum dataType = DataTypeEnum.forType(elasticsearchSink.getSinkMultipleFormat());
- switch (dataType) {
- case CANAL:
- format = new CanalJsonFormat();
- break;
- case DEBEZIUM_JSON:
- format = new DebeziumJsonFormat();
- break;
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported dataType=%s for elasticsearch", dataType));
- }
- }
- return new ElasticsearchLoadNode(
- elasticsearchSink.getSinkName(),
- elasticsearchSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- elasticsearchSink.getIndexName(),
- elasticsearchSink.getHosts(),
- elasticsearchSink.getUsername(),
- elasticsearchSink.getPassword(),
- elasticsearchSink.getDocumentType(),
- elasticsearchSink.getPrimaryKey(),
- elasticsearchSink.getEsVersion(),
- elasticsearchSink.getSinkMultipleEnable(),
- format,
- elasticsearchSink.getIndexPattern());
- }
-
- /**
- * Create load node of HDFS.
- */
- public static FileSystemLoadNode createLoadNode(HDFSSink hdfsSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- List<FieldInfo> partitionFields = Lists.newArrayList();
- if (CollectionUtils.isNotEmpty(hdfsSink.getPartitionFieldList())) {
- partitionFields = hdfsSink.getPartitionFieldList().stream()
- .map(partitionField -> new FieldInfo(partitionField.getFieldName(), hdfsSink.getSinkName(),
- FieldInfoUtils.convertFieldFormat(partitionField.getFieldType(),
- partitionField.getFieldFormat())))
- .collect(Collectors.toList());
- }
-
- return new FileSystemLoadNode(
- hdfsSink.getSinkName(),
- hdfsSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- Lists.newArrayList(),
- hdfsSink.getDataPath(),
- hdfsSink.getFileFormat(),
- null,
- properties,
- partitionFields,
- hdfsSink.getServerTimeZone());
- }
-
- /**
- * Create greenplum load node
- */
- public static GreenplumLoadNode createLoadNode(GreenplumSink greenplumSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- return new GreenplumLoadNode(
- greenplumSink.getSinkName(),
- greenplumSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- greenplumSink.getJdbcUrl(),
- greenplumSink.getUsername(),
- greenplumSink.getPassword(),
- greenplumSink.getTableName(),
- greenplumSink.getPrimaryKey());
- }
-
- /**
- * Create load node of MySQL.
- */
- public static MySqlLoadNode createLoadNode(MySQLSink mysqlSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- return new MySqlLoadNode(
- mysqlSink.getSinkName(),
- mysqlSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- Lists.newArrayList(),
- null,
- null,
- properties,
- MySQLSinkDTO.setDbNameToUrl(mysqlSink.getJdbcUrl(), mysqlSink.getDatabaseName()),
- mysqlSink.getUsername(),
- mysqlSink.getPassword(),
- mysqlSink.getTableName(),
- mysqlSink.getPrimaryKey());
- }
-
- /**
- * Create load node of ORACLE.
- */
- public static OracleLoadNode createLoadNode(OracleSink oracleSink, List<FieldInfo> fieldInfos,
- List<FieldRelation> fieldRelations, Map<String, String> properties) {
- return new OracleLoadNode(
- oracleSink.getSinkName(),
- oracleSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- oracleSink.getJdbcUrl(),
- oracleSink.getUsername(),
- oracleSink.getPassword(),
- oracleSink.getTableName(),
- oracleSink.getPrimaryKey());
- }
-
- /**
- * Create load node of TDSQLPostgreSQL.
- */
- public static TDSQLPostgresLoadNode createLoadNode(TDSQLPostgreSQLSink tdsqlPostgreSQLSink,
- List<FieldInfo> fieldInfos, List<FieldRelation> fieldRelations, Map<String, String> properties) {
- return new TDSQLPostgresLoadNode(
- tdsqlPostgreSQLSink.getSinkName(),
- tdsqlPostgreSQLSink.getSinkName(),
- fieldInfos,
- fieldRelations,
- null,
- null,
- null,
- properties,
- tdsqlPostgreSQLSink.getJdbcUrl(),
- tdsqlPostgreSQLSink.getUsername(),
- tdsqlPostgreSQLSink.getPassword(),
- tdsqlPostgreSQLSink.getSchemaName() + "." + tdsqlPostgreSQLSink.getTableName(),
- tdsqlPostgreSQLSink.getPrimaryKey());
- }
-
- /**
- * Parse information field of data sink.
- */
- public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList,
- Map<String, StreamField> constantFieldMap) {
- if (CollectionUtils.isEmpty(fieldList)) {
- return Lists.newArrayList();
- }
- return fieldList.stream()
- .filter(sinkField -> StringUtils.isNotEmpty(sinkField.getSourceFieldName()))
- .map(field -> {
- FieldInfo outputField = new FieldInfo(field.getFieldName(),
- FieldInfoUtils.convertFieldFormat(field.getFieldType(), field.getFieldFormat()));
- FunctionParam inputField;
- String fieldKey = String.format("%s-%s", field.getOriginNodeName(), field.getSourceFieldName());
- StreamField constantField = constantFieldMap.get(fieldKey);
- if (constantField != null) {
- if (outputField.getFormatInfo() != null
- && outputField.getFormatInfo().getTypeInfo() == StringTypeInfo.INSTANCE) {
- inputField = new StringConstantParam(constantField.getFieldValue());
- } else {
- inputField = new ConstantParam(constantField.getFieldValue());
- }
- } else if (FieldType.FUNCTION.name().equalsIgnoreCase(field.getSourceFieldType())) {
- inputField = new CustomFunction(field.getSourceFieldName());
- } else {
- inputField = new FieldInfo(field.getSourceFieldName(), field.getOriginNodeName(),
- FieldInfoUtils.convertFieldFormat(field.getSourceFieldType()));
- }
- return new FieldRelation(inputField, outputField);
- }).collect(Collectors.toList());
- }
-
- /**
- * Check the validation of Hive partition field.
- */
- public static void checkPartitionField(List<SinkField> fieldList, List<HivePartitionField> partitionList) {
- if (CollectionUtils.isEmpty(partitionList)) {
- return;
- }
- if (CollectionUtils.isEmpty(fieldList)) {
- throw new BusinessException(ErrorCodeEnum.SINK_FIELD_LIST_IS_EMPTY);
- }
-
- Map<String, SinkField> sinkFieldMap = new HashMap<>(fieldList.size());
- fieldList.forEach(field -> sinkFieldMap.put(field.getFieldName(), field));
-
- for (HivePartitionField partitionField : partitionList) {
- String fieldName = partitionField.getFieldName();
- if (StringUtils.isBlank(fieldName)) {
- throw new BusinessException(ErrorCodeEnum.PARTITION_FIELD_NAME_IS_EMPTY);
- }
-
- SinkField sinkField = sinkFieldMap.get(fieldName);
- if (sinkField == null) {
- throw new BusinessException(
- String.format(ErrorCodeEnum.PARTITION_FIELD_NOT_FOUND.getMessage(), fieldName));
- }
- if (StringUtils.isBlank(sinkField.getSourceFieldName())) {
- throw new BusinessException(
- String.format(ErrorCodeEnum.PARTITION_FIELD_NO_SOURCE_FIELD.getMessage(), fieldName));
- }
- }
- }
-
- /**
- * Parse format
- *
- * @param formatName data serialization, support: csv, json, canal, avro, etc
- * @param wrapWithInlongMsg whether wrap content with {@link InLongMsgFormat}
- * @param separatorStr the separator of data content
- * @param ignoreParseErrors whether ignore deserialization error data
- * @return the format for serialized content
- */
- private static Format parsingFormat(
- String formatName,
- boolean wrapWithInlongMsg,
- String separatorStr,
- boolean ignoreParseErrors) {
- Format format;
- DataTypeEnum dataType = DataTypeEnum.forType(formatName);
- switch (dataType) {
- case CSV:
- if (StringUtils.isNumeric(separatorStr)) {
- char dataSeparator = (char) Integer.parseInt(separatorStr);
- separatorStr = Character.toString(dataSeparator);
- }
- CsvFormat csvFormat = new CsvFormat(separatorStr);
- csvFormat.setIgnoreParseErrors(ignoreParseErrors);
- format = csvFormat;
- break;
- case AVRO:
- format = new AvroFormat();
- break;
- case JSON:
- JsonFormat jsonFormat = new JsonFormat();
- jsonFormat.setIgnoreParseErrors(ignoreParseErrors);
- format = jsonFormat;
- break;
- case CANAL:
- format = new CanalJsonFormat();
- break;
- case DEBEZIUM_JSON:
- DebeziumJsonFormat debeziumJsonFormat = new DebeziumJsonFormat();
- debeziumJsonFormat.setIgnoreParseErrors(ignoreParseErrors);
- format = debeziumJsonFormat;
- break;
- case RAW:
- format = new RawFormat();
- break;
- default:
- throw new IllegalArgumentException(String.format("Unsupported dataType=%s", dataType));
- }
- if (wrapWithInlongMsg) {
- Format innerFormat = format;
- format = new InLongMsgFormat(innerFormat, false);
- }
- return format;
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index d23f08ddf3..8c58dd1985 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -21,8 +21,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
-import org.apache.inlong.manager.pojo.sort.node.NodeProviderFactory;
-import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
+import org.apache.inlong.manager.pojo.sort.node.NodeFactory;
import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
import org.apache.inlong.manager.pojo.source.StreamSource;
@@ -211,9 +210,9 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
private List<Node> createNodes(List<StreamSource> sources, List<TransformResponse> transformResponses,
List<StreamSink> sinks, Map<String, StreamField> constantFieldMap) {
List<Node> nodes = new ArrayList<>();
- nodes.addAll(NodeProviderFactory.createExtractNodes(sources));
+ nodes.addAll(NodeFactory.createExtractNodes(sources));
nodes.addAll(TransformNodeUtils.createTransformNodes(transformResponses, constantFieldMap));
- nodes.addAll(LoadNodeUtils.createLoadNodes(sinks, constantFieldMap));
+ nodes.addAll(NodeFactory.createLoadNodes(sinks, constantFieldMap));
return nodes;
}