You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/05 03:53:57 UTC
[incubator-seatunnel] branch dev updated: [Connector-V2] Add Kudu source and sink connector (#2254)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0483cbc2d [Connector-V2] Add Kudu source and sink connector (#2254)
0483cbc2d is described below
commit 0483cbc2df2458db24beec61d71b6d62520b4dea
Author: liuyehan <81...@users.noreply.github.com>
AuthorDate: Fri Aug 5 11:53:52 2022 +0800
[Connector-V2] Add Kudu source and sink connector (#2254)
* 0
* Update pom.xml
add kudu dependency
* Update plugin-mapping.properties
add kudu config
* Update pom.xml
add kudu
* add email sink connector
* Delete seatunnel-connectors-v2/connector-email directory
* Update plugin-mapping.properties
* Update plugin-mapping.properties
* Update pom.xml
* Create pom.xml
* [Connector-V2] Add Kudu source and sink connector
* [Connector-V2] Add Kudu source and sink connector
* [Connector-V2] update
* [Connector-V2] solve ci error
* [Connector-V2] fix problem on code review
* [Connector-V2] add kudu usage document and fix problem on code review
* Update ExceptionUtil.java
* Update pom.xml
* Update Kudu.md
* Update ExceptionUtil.java
Co-authored-by: Hisoka <fa...@qq.com>
---
docs/en/connector-v2/sink/Kudu.md | 39 ++++
docs/en/connector-v2/source/Kudu.md | 41 +++++
plugin-mapping.properties | 2 +
pom.xml | 7 +
.../org/apache/seatunnel/common/ExceptionUtil.java | 37 ++++
seatunnel-connectors-v2-dist/pom.xml | 5 +
.../{ => connector-kudu}/pom.xml | 60 +++---
.../seatunnel/kudu/config/KuduSinkConfig.java | 64 +++++++
.../seatunnel/kudu/config/KuduSourceConfig.java | 29 +++
.../seatunnel/kudu/kuduclient/KuduInputFormat.java | 202 +++++++++++++++++++++
.../kudu/kuduclient/KuduOutputFormat.java | 164 +++++++++++++++++
.../seatunnel/kudu/kuduclient/KuduTypeMapper.java | 103 +++++++++++
.../connectors/seatunnel/kudu/sink/KuduSink.java | 69 +++++++
.../seatunnel/kudu/sink/KuduSinkWriter.java | 64 +++++++
.../seatunnel/kudu/source/KuduSource.java | 191 +++++++++++++++++++
.../seatunnel/kudu/source/KuduSourceReader.java | 107 +++++++++++
.../seatunnel/kudu/source/KuduSourceSplit.java | 39 ++++
.../kudu/source/KuduSourceSplitEnumerator.java | 131 +++++++++++++
.../seatunnel/kudu/source/PartitionParameter.java | 32 ++++
.../seatunnel/kudu/state/KuduSourceState.java | 23 +++
.../src/main/resources/kudu_to_kudu_flink.conf | 60 ++++++
.../src/main/resources/kudu_to_kudu_spark.conf | 64 +++++++
seatunnel-connectors-v2/pom.xml | 1 +
23 files changed, 1500 insertions(+), 34 deletions(-)
diff --git a/docs/en/connector-v2/sink/Kudu.md b/docs/en/connector-v2/sink/Kudu.md
new file mode 100644
index 000000000..4ccb993dc
--- /dev/null
+++ b/docs/en/connector-v2/sink/Kudu.md
@@ -0,0 +1,39 @@
+# Kudu
+
+## Description
+
+Write data to Kudu.
+
+ The tested kudu version is 1.11.1.
+
+## Options
+
+| name | type | required | default value |
+|--------------------------|---------|----------|---------------|
+| kudu_master | string | yes | - |
+| kudu_table | string | yes | - |
+| save_mode | string | yes | - |
+
+### kudu_master [string]
+
+`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.
+
+### kudu_table [string]
+
+`kudu_table` The name of kudu table..
+
+### save_mode [string]
+
+Storage mode, we need support `overwrite` and `append`. `append` is now supported.
+
+## Example
+
+```bash
+
+ kuduSink {
+ kudu_master = "192.168.88.110:7051"
+ kudu_table = "studentlyhresultflink"
+ save_mode="append"
+ }
+
+```
diff --git a/docs/en/connector-v2/source/Kudu.md b/docs/en/connector-v2/source/Kudu.md
new file mode 100644
index 000000000..7b85bc7cd
--- /dev/null
+++ b/docs/en/connector-v2/source/Kudu.md
@@ -0,0 +1,41 @@
+# Kudu
+
+## Description
+
+Used to read data from Kudu. Currently, only supports Query with Batch Mode.
+
+ The tested kudu version is 1.11.1.
+
+## Options
+
+| name | type | required | default value |
+|--------------------------|---------|----------|---------------|
+| kudu_master | string | yes | - |
+| kudu_table | string | yes | - |
+| columnsList | string | yes | - |
+
+### kudu_master [string]
+
+`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.
+
+### kudu_table [string]
+
+`kudu_table` The name of kudu table..
+
+### columnsList [string]
+
+`columnsList` Specifies the column names of the table.
+
+## Examples
+
+```hocon
+source {
+ KuduSource {
+ result_table_name = "studentlyh2"
+ kudu_master = "192.168.88.110:7051"
+ kudu_table = "studentlyh2"
+ columnsList = "id,name,age,sex"
+ }
+
+}
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c448b86bc..c6c3f16f5 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -103,6 +103,8 @@ seatunnel.sink.Clickhouse = connector-clickhouse
seatunnel.sink.ClickhouseFile = connector-clickhouse
seatunnel.source.Jdbc = connector-jdbc
seatunnel.sink.Jdbc = connector-jdbc
+seatunnel.source.Kudu = connector-kudu
+seatunnel.sink.Kudu = connector-kudu
seatunnel.sink.Email = connector-email
seatunnel.sink.HdfsFile = connector-file-hadoop
seatunnel.sink.LocalFile = connector-file-local
diff --git a/pom.xml b/pom.xml
index 39927e48b..188a1ba57 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
<mongo-spark.version>2.2.0</mongo-spark.version>
<spark-redis.version>2.6.0</spark-redis.version>
<commons-lang3.version>3.4</commons-lang3.version>
+ <kudu.version>1.11.1</kudu.version>
<email.version>1.5.6</email.version>
<commons-collections4.version>4.4</commons-collections4.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
@@ -263,6 +264,12 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
+ </dependency>
+ <!--kudu -->
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${kudu.version}</version>
</dependency>
<!--email -->
<dependency>
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/ExceptionUtil.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/ExceptionUtil.java
new file mode 100644
index 000000000..7ad7e7fc8
--- /dev/null
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/ExceptionUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+public class ExceptionUtil {
+ public static String getMessage(Throwable e) {
+ try (StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw)){
+ // Output the error stack information to the printWriter
+ e.printStackTrace(pw);
+ pw.flush();
+ sw.flush();
+ return sw.toString();
+ } catch (Exception e1) {
+ e1.printStackTrace();
+ throw new RuntimeException("Failed to print exception logs", e1);
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index dd8d105bb..336c92b74 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -101,6 +101,11 @@
<artifactId>connector-dingtalk</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-kudu</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-email</artifactId>
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/connector-kudu/pom.xml
similarity index 53%
copy from seatunnel-connectors-v2/pom.xml
copy to seatunnel-connectors-v2/connector-kudu/pom.xml
index b5ed52de7..6a3e5238d 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/connector-kudu/pom.xml
@@ -17,50 +17,42 @@
limitations under the License.
-->
-
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel</artifactId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <artifactId>seatunnel-connectors-v2</artifactId>
- <modules>
- <module>connector-common</module>
- <module>connector-clickhouse</module>
- <module>connector-console</module>
- <module>connector-fake</module>
- <module>connector-http</module>
- <module>connector-jdbc</module>
- <module>connector-kafka</module>
- <module>connector-pulsar</module>
- <module>connector-socket</module>
- <module>connector-hive</module>
- <module>connector-file</module>
- <module>connector-hudi</module>
- <module>connector-assert</module>
- <module>connector-email</module>
- <module>connector-dingtalk</module>
- </modules>
+ <artifactId>connector-kudu</artifactId>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
</project>
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
new file mode 100644
index 000000000..b44a3e730
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
+
+@Data
+public class KuduSinkConfig {
+
+ private static final String KUDU_SAVE_MODE = "save_mode";
+ private static final String KUDU_MASTER = "kudu_master";
+ private static final String KUDU_TABLE_NAME = "kudu_table";
+
+ private SaveMode saveMode = SaveMode.APPEND;
+
+ private String kuduMaster;
+
+ /**
+ * Specifies the name of the table
+ */
+ private String kuduTableName;
+
+ public enum SaveMode {
+ APPEND(),
+ OVERWRITE();
+
+ public static SaveMode fromStr(String str) {
+ if ("overwrite".equals(str)) {
+ return OVERWRITE;
+ } else {
+ return APPEND;
+ }
+ }
+ }
+
+ public KuduSinkConfig(@NonNull Config pluginConfig) {
+ if (pluginConfig.hasPath(KUDU_SAVE_MODE) && pluginConfig.hasPath(KUDU_MASTER) && pluginConfig.hasPath(KUDU_TABLE_NAME)) {
+ this.saveMode = StringUtils.isBlank(pluginConfig.getString(KUDU_SAVE_MODE)) ? SaveMode.APPEND : SaveMode.fromStr(pluginConfig.getString(KUDU_SAVE_MODE));
+ this.kuduMaster = pluginConfig.getString(KUDU_MASTER);
+ this.kuduTableName = pluginConfig.getString(KUDU_TABLE_NAME);
+ } else {
+ throw new RuntimeException("Missing Sink configuration parameters");
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
new file mode 100644
index 000000000..754b53114
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import java.io.Serializable;
+
+public class KuduSourceConfig implements Serializable {
+
+ public static final String KUDUMASTER = "kudu_master";
+ public static final String TABLENAME = "kudu_table";
+ public static final String COLUMNSLIST = "columnsList";
+
+
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
new file mode 100644
index 000000000..d00cc2bd3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.ExceptionUtil;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduInputFormat implements Serializable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KuduInputFormat.class);
+
+ public KuduInputFormat(String kuduMaster, String tableName, String columnsList) {
+ this.kuduMaster = kuduMaster;
+ this.columnsList = Arrays.asList(columnsList.split(","));
+ this.tableName = tableName;
+
+ }
+
+ /**
+ * Declare the global variable KuduClient and use it to manipulate the Kudu table
+ */
+ public KuduClient kuduClient;
+
+ /**
+ * Specify kuduMaster address
+ */
+ public String kuduMaster;
+ public List<String> columnsList;
+ public Schema schema;
+ public String keyColumn;
+ public static final int TIMEOUTMS = 18000;
+
+ /**
+ * Specifies the name of the table
+ */
+ public String tableName;
+
+ public List<ColumnSchema> getColumnsSchemas() {
+ List<ColumnSchema> columns = null;
+ try {
+ schema = kuduClient.openTable(tableName).getSchema();
+ keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
+ columns = schema.getColumns();
+ } catch (KuduException e) {
+ LOGGER.warn("get table Columns Schemas Fail.", e);
+ throw new RuntimeException("get table Columns Schemas Fail..", e);
+ }
+ return columns;
+ }
+
+ public static SeaTunnelRow getSeaTunnelRowData(RowResult rs, SeaTunnelRowType typeInfo) throws SQLException {
+
+ List<Object> fields = new ArrayList<>();
+ SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
+ for (int i = 0; i < seaTunnelDataTypes.length; i++) {
+ Object seatunnelField;
+ SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
+ if (null == rs.getObject(i)) {
+ seatunnelField = null;
+ } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getBoolean(i);
+ } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getByte(i);
+ } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getShort(i);
+ } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getInt(i);
+ } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getLong(i);
+ } else if (seaTunnelDataType instanceof DecimalType) {
+ Object value = rs.getObject(i);
+ seatunnelField = value instanceof BigInteger ?
+ new BigDecimal((BigInteger) value, 0)
+ : value;
+ } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getFloat(i);
+ } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getDouble(i);
+ } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
+ seatunnelField = rs.getString(i);
+ } else {
+ throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
+ }
+ fields.add(seatunnelField);
+ }
+
+ return new SeaTunnelRow(fields.toArray());
+ }
+
+ public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
+
+ ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+ ArrayList<String> fieldNames = new ArrayList<>();
+ try {
+
+ for (int i = 0; i < columnSchemaList.size(); i++) {
+ fieldNames.add(columnSchemaList.get(i).getName());
+ seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
+ }
+ } catch (Exception e) {
+ LOGGER.warn("get row type info exception.", e);
+ throw new PrepareFailException("kudu", PluginType.SOURCE, ExceptionUtil.getMessage(e));
+ }
+ return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+ }
+
+ public void openInputFormat() {
+ KuduClient.KuduClientBuilder kuduClientBuilder = new
+ KuduClient.KuduClientBuilder(kuduMaster);
+ kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
+
+ kuduClient = kuduClientBuilder.build();
+
+ LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
+
+ }
+
+ /**
+ * @param lowerBound The beginning of each slice
+ * @param upperBound End of each slice
+ * @return Get the kuduScanner object for each slice
+ */
+
+ public KuduScanner getKuduBuildSplit(int lowerBound, int upperBound) {
+ KuduScanner kuduScanner = null;
+ try {
+ KuduScanner.KuduScannerBuilder kuduScannerBuilder =
+ kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
+
+ kuduScannerBuilder.setProjectedColumnNames(columnsList);
+
+ KuduPredicate lowerPred = KuduPredicate.newComparisonPredicate(
+ schema.getColumn("" + keyColumn),
+ KuduPredicate.ComparisonOp.GREATER_EQUAL,
+ lowerBound);
+
+ KuduPredicate upperPred = KuduPredicate.newComparisonPredicate(
+ schema.getColumn("" + keyColumn),
+ KuduPredicate.ComparisonOp.LESS,
+ upperBound);
+
+ kuduScanner = kuduScannerBuilder.addPredicate(lowerPred)
+ .addPredicate(upperPred).build();
+ } catch (KuduException e) {
+ LOGGER.warn("get the Kuduscan object for each splice exception", e);
+ throw new RuntimeException("get the Kuduscan object for each splice exception.", e);
+ }
+ return kuduScanner;
+ }
+
+ public void closeInputFormat() {
+ if (kuduClient != null) {
+ try {
+ kuduClient.close();
+ } catch (KuduException e) {
+ LOGGER.warn("Kudu Client close failed.", e);
+ throw new RuntimeException("Kudu Client close failed.", e);
+ } finally {
+ kuduClient = null;
+ }
+ }
+
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
new file mode 100644
index 000000000..f73b191ca
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.SessionConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+
+/**
+ * A Kudu outputFormat
+ */
+public class KuduOutputFormat
+ implements Serializable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KuduOutputFormat.class);
+
+ private String kuduMaster;
+ private String kuduTableName;
+ private KuduClient kuduClient;
+ private KuduSession kuduSession;
+ private KuduTable kuduTable;
+ public static final long TIMEOUTMS = 18000;
+ public static final long SESSIONTIMEOUTMS = 100000;
+ public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) {
+ this.kuduMaster = kuduSinkConfig.getKuduMaster();
+ this.kuduTableName = kuduSinkConfig.getKuduTableName();
+ init();
+ }
+
+ public void write(SeaTunnelRow element) {
+
+ Insert insert = kuduTable.newInsert();
+ Schema schema = kuduTable.getSchema();
+
+ int columnCount = schema.getColumnCount();
+ PartialRow row = insert.getRow();
+ for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
+ ColumnSchema col = schema.getColumnByIndex(columnIndex);
+ try {
+ switch (col.getType()) {
+ case BOOL:
+ row.addBoolean(columnIndex, (Boolean) element.getField(columnIndex));
+ break;
+ case INT8:
+ row.addByte(columnIndex, (Byte) element.getField(columnIndex));
+ break;
+ case INT16:
+ row.addShort(columnIndex, (Short) element.getField(columnIndex));
+ break;
+ case INT32:
+ row.addInt(columnIndex, (Integer) element.getField(columnIndex));
+ break;
+ case INT64:
+ row.addLong(columnIndex, (Long) element.getField(columnIndex));
+ break;
+ case UNIXTIME_MICROS:
+ if (element.getField(columnIndex) instanceof Timestamp) {
+ row.addTimestamp(columnIndex, (Timestamp) element.getField(columnIndex));
+ } else {
+ row.addLong(columnIndex, (Long) element.getField(columnIndex));
+ }
+ break;
+ case FLOAT:
+ row.addFloat(columnIndex, (Float) element.getField(columnIndex));
+ break;
+ case DOUBLE:
+ row.addDouble(columnIndex, (Double) element.getField(columnIndex));
+ break;
+ case STRING:
+ row.addString(columnIndex, element.getField(columnIndex).toString());
+ break;
+ case BINARY:
+ if (element.getField(columnIndex) instanceof byte[]) {
+ row.addBinary(columnIndex, (byte[]) element.getField(columnIndex));
+ } else {
+ row.addBinary(columnIndex, (ByteBuffer) element.getField(columnIndex));
+ }
+ break;
+ case DECIMAL:
+ row.addDecimal(columnIndex, (BigDecimal) element.getField(columnIndex));
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported column type: " + col.getType());
+ }
+ } catch (ClassCastException e) {
+ e.printStackTrace();
+ throw new IllegalArgumentException(
+ "Value type does not match column type " + col.getType() +
+ " for column " + col.getName());
+ }
+
+ }
+
+ try {
+ kuduSession.apply(insert);
+ } catch (KuduException e) {
+ LOGGER.warn("kudu session insert data fail.", e);
+ throw new RuntimeException("kudu session insert data fail.", e);
+ }
+
+ }
+
+ public void init() {
+ KuduClient.KuduClientBuilder kuduClientBuilder = new
+ KuduClient.KuduClientBuilder(kuduMaster);
+ kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
+ this.kuduClient = kuduClientBuilder.build();
+ this.kuduSession = kuduClient.newSession();
+ this.kuduSession.setTimeoutMillis(SESSIONTIMEOUTMS);
+ this.kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+ try {
+ kuduTable = kuduClient.openTable(kuduTableName);
+ } catch (KuduException e) {
+ LOGGER.warn("Failed to initialize the Kudu client.", e);
+ throw new RuntimeException("Failed to initialize the Kudu client.", e);
+ }
+ LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
+ }
+
+ public void closeOutputFormat() {
+ if (kuduClient != null) {
+ try {
+ kuduClient.close();
+ kuduSession.close();
+ } catch (KuduException e) {
+ LOGGER.warn("Kudu Client close failed.", e);
+ } finally {
+ kuduClient = null;
+ kuduSession = null;
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
new file mode 100644
index 000000000..cea22dfb2
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import org.apache.kudu.ColumnSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.List;
+
+public class KuduTypeMapper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KuduTypeMapper.class);
+
+ // ============================data types=====================
+
+ private static final String KUDU_UNKNOWN = "UNKNOWN";
+ private static final String KUDU_BIT = "BOOL";
+
+ // -------------------------number----------------------------
+ private static final String KUDU_TINYINT = "INT8";
+ private static final String KUDU_MEDIUMINT = "INT32";
+ private static final String KUDU_INT = "INT16";
+ private static final String KUDU_BIGINT = "INT64";
+
+ private static final String KUDU_FLOAT = "FLOAT";
+
+ private static final String KUDU_DOUBLE = "DOUBLE";
+ private static final String KUDU_DECIMAL = "DECIMAL32";
+
+
+ // -------------------------string----------------------------
+
+ private static final String KUDU_VARCHAR = "STRING";
+
+
+ // ------------------------------time-------------------------
+
+ private static final String KUDU_UNIXTIME_MICROS = "UNIXTIME_MICROS";
+
+
+ // ------------------------------blob-------------------------
+
+ private static final String KUDU_BINARY = "BINARY";
+ private static final int PRECISION = 20;
+ public static SeaTunnelDataType<?> mapping(List<ColumnSchema> columnSchemaList, int colIndex) throws SQLException {
+ String kuduType = columnSchemaList.get(colIndex).getType().getName().toUpperCase();
+ switch (kuduType) {
+ case KUDU_BIT:
+ return BasicType.BOOLEAN_TYPE;
+ case KUDU_TINYINT:
+ case KUDU_MEDIUMINT:
+ case KUDU_INT:
+ return BasicType.INT_TYPE;
+ case KUDU_BIGINT:
+ return BasicType.LONG_TYPE;
+ case KUDU_DECIMAL:
+ return new DecimalType(PRECISION, 0);
+ case KUDU_FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case KUDU_DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+
+ case KUDU_VARCHAR:
+ return BasicType.STRING_TYPE;
+ case KUDU_UNIXTIME_MICROS:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case KUDU_BINARY:
+ return PrimitiveByteArrayType.INSTANCE;
+
+ //Doesn't support yet
+
+ case KUDU_UNKNOWN:
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Doesn't support KUDU type '%s' .",
+ kuduType));
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
new file mode 100644
index 000000000..2f907ae49
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+/**
+ * Kudu Sink implementation by using SeaTunnel sink API.
+ * This class contains the method to create {@link AbstractSimpleSink}.
+ */
+@AutoService(SeaTunnelSink.class)
+public class KuduSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+
+ private Config config;
+ private SeaTunnelRowType seaTunnelRowType;
+
+ @Override
+ public String getPluginName() {
+ return "kuduSink";
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ this.config = pluginConfig;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
+ return new KuduSinkWriter(seaTunnelRowType, config);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
new file mode 100644
index 000000000..d78f48618
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduOutputFormat;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class KuduSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KuduSinkWriter.class);
+
+ private SeaTunnelRowType seaTunnelRowType;
+ private Config pluginConfig;
+
+
+ private KuduOutputFormat fileWriter;
+
+ private KuduSinkConfig kuduSinkConfig;
+
+ public KuduSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowType,
+ @NonNull Config pluginConfig) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.pluginConfig = pluginConfig;
+
+ kuduSinkConfig = new KuduSinkConfig(this.pluginConfig);
+ fileWriter = new KuduOutputFormat(kuduSinkConfig);
+
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ fileWriter.write(element);
+ }
+
+ @Override
+ public void close() throws IOException {
+ fileWriter.closeOutputFormat();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
new file mode 100644
index 000000000..65b004a6a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
+import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@AutoService(SeaTunnelSource.class)
+public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, KuduSourceState> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KuduSource.class);
+
+ private SeaTunnelContext seaTunnelContext;
+ private SeaTunnelRowType rowTypeInfo;
+ private KuduInputFormat kuduInputFormat;
+ private PartitionParameter partitionParameter;
+ public static final int TIMEOUTMS = 18000;
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelRowType getProducedType() {
+ return this.rowTypeInfo;
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, KuduSourceSplit> createReader(SourceReader.Context readerContext) {
+ return new KuduSourceReader(kuduInputFormat, readerContext);
+ }
+
+ @Override
+ public Serializer<KuduSourceSplit> getSplitSerializer() {
+ return SeaTunnelSource.super.getSplitSerializer();
+ }
+
+ @Override
+ public SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> createEnumerator(
+ SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext) {
+ return new KuduSourceSplitEnumerator(enumeratorContext, partitionParameter);
+ }
+
+ @Override
+ public SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> restoreEnumerator(
+ SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSourceState checkpointState) {
+ // todo:
+ return new KuduSourceSplitEnumerator(enumeratorContext, partitionParameter);
+ }
+
+ @Override
+ public Serializer<KuduSourceState> getEnumeratorStateSerializer() {
+ return new DefaultSerializer<>();
+ }
+
+ @Override
+ public String getPluginName() {
+ return "KuduSource";
+ }
+
+ @Override
+ public void prepare(Config config) {
+ String kudumaster = "";
+ String tableName = "";
+ String columnslist = "";
+ if (config.hasPath(KuduSourceConfig.KUDUMASTER) && config.hasPath(KuduSourceConfig.KUDUMASTER) && config.hasPath(KuduSourceConfig.KUDUMASTER)) {
+ kudumaster = config.getString(KuduSourceConfig.KUDUMASTER);
+ tableName = config.getString(KuduSourceConfig.TABLENAME);
+ columnslist = config.getString(KuduSourceConfig.COLUMNSLIST);
+ kuduInputFormat = new KuduInputFormat(kudumaster, tableName, columnslist);
+ } else {
+ throw new RuntimeException("Missing Source configuration parameters");
+ }
+ try {
+ KuduClient.KuduClientBuilder kuduClientBuilder = new
+ KuduClient.KuduClientBuilder(kudumaster);
+ kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
+
+ KuduClient kuduClient = kuduClientBuilder.build();
+ partitionParameter = initPartitionParameter(kuduClient, tableName);
+ SeaTunnelRowType seaTunnelRowType = getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
+ rowTypeInfo = seaTunnelRowType;
+ } catch (KuduException e) {
+ throw new RuntimeException("Parameters in the preparation phase fail to be generated", e);
+ }
+ }
+
+ private PartitionParameter initPartitionParameter(KuduClient kuduClient, String tableName) {
+ String keyColumn = null;
+ int maxKey = 0;
+ int minKey = 0;
+ boolean flag = true;
+ try {
+ KuduScanner.KuduScannerBuilder kuduScannerBuilder =
+ kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
+ ArrayList<String> columnsList = new ArrayList<String>();
+ keyColumn = kuduClient.openTable(tableName).getSchema().getPrimaryKeyColumns().get(0).getName();
+ columnsList.add("" + keyColumn);
+ kuduScannerBuilder.setProjectedColumnNames(columnsList);
+ KuduScanner kuduScanner = kuduScannerBuilder.build();
+ while (kuduScanner.hasMoreRows()) {
+ RowResultIterator rowResults = kuduScanner.nextRows();
+ while (rowResults.hasNext()) {
+ RowResult row = rowResults.next();
+ int id = row.getInt("" + keyColumn);
+ if (flag) {
+ maxKey = id;
+ minKey = id;
+ flag = false;
+ } else {
+ if (id >= maxKey) {
+ maxKey = id;
+ }
+ if (id <= minKey) {
+ minKey = id;
+ }
+ }
+ }
+ }
+ } catch (KuduException e) {
+ throw new RuntimeException("Failed to generate upper and lower limits for each partition", e);
+ }
+ return new PartitionParameter(keyColumn, Long.parseLong(minKey + ""), Long.parseLong(maxKey + ""));
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+
+ public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
+ ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+ ArrayList<String> fieldNames = new ArrayList<>();
+ try {
+
+ for (int i = 0; i < columnSchemaList.size(); i++) {
+ fieldNames.add(columnSchemaList.get(i).getName());
+ seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
+ }
+
+ } catch (Exception e) {
+ LOGGER.warn("get row type info exception", e);
+ throw new PrepareFailException("kudu", PluginType.SOURCE, e.toString());
+ }
+ return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
new file mode 100644
index 000000000..e3c3cd25d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.RowResultIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+
+public class KuduSourceReader implements SourceReader<SeaTunnelRow, KuduSourceSplit> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KuduSourceReader.class);
+
+ private final SourceReader.Context context;
+
+ private final KuduInputFormat kuduInputFormat;
+ Deque<KuduSourceSplit> splits = new LinkedList<>();
+
+ boolean noMoreSplit;
+
+ public KuduSourceReader(KuduInputFormat kuduInputFormat, SourceReader.Context context) {
+ this.context = context;
+ this.kuduInputFormat = kuduInputFormat;
+ }
+
+ @Override
+ public void open() {
+ kuduInputFormat.openInputFormat();
+ }
+
+ @Override
+ public void close() {
+ kuduInputFormat.closeInputFormat();
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ KuduSourceSplit split = splits.poll();
+ Object[] parameterValues = split.parameterValues;
+ int lowerBound = Integer.parseInt(parameterValues[0].toString());
+ int upperBound = Integer.parseInt(parameterValues[1].toString());
+ List<ColumnSchema> columnSchemaList = kuduInputFormat.getColumnsSchemas();
+ KuduScanner kuduScanner = kuduInputFormat.getKuduBuildSplit(lowerBound, upperBound);
+ //
+ while (kuduScanner.hasMoreRows()) {
+ RowResultIterator rowResults = kuduScanner.nextRows();
+ while (rowResults.hasNext()) {
+ RowResult rowResult = rowResults.next();
+ SeaTunnelRow seaTunnelRow = KuduInputFormat.getSeaTunnelRowData(rowResult, kuduInputFormat.getSeaTunnelRowType(columnSchemaList));
+ output.collect(seaTunnelRow);
+ }
+ }
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ LOGGER.info("Closed the bounded fake source");
+ context.signalNoMoreElement();
+ }
+
+ }
+
+ @Override
+ public List<KuduSourceSplit> snapshotState(long checkpointId) {
+ return null;
+ }
+
+ @Override
+ public void addSplits(List<KuduSourceSplit> splits) {
+ this.splits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ noMoreSplit = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java
new file mode 100644
index 000000000..4f1320f73
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class KuduSourceSplit implements SourceSplit {
+
+ private static final long serialVersionUID = -1L;
+
+ Object[] parameterValues;
+ public final Integer splitId;
+
+ @Override
+ public String splitId() {
+ return splitId.toString();
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
new file mode 100644
index 000000000..a2e54fef6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class KuduSourceSplitEnumerator implements SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> {
+
+ private final SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext;
+ private PartitionParameter partitionParameter;
+ List<KuduSourceSplit> allSplit = new ArrayList<>();
+ private Long maxVal;
+ private Long minVal;
+ private Long batchSize;
+ private Integer batchNum;
+
+ public KuduSourceSplitEnumerator(SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, PartitionParameter partitionParameter) {
+ this.enumeratorContext = enumeratorContext;
+ this.partitionParameter = partitionParameter;
+ }
+
+ @Override
+ public void open() {
+
+ }
+
+ @Override
+ public void run() {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
+
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return 0;
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ int parallelism = enumeratorContext.currentParallelism();
+ if (allSplit.isEmpty()) {
+ if (null != partitionParameter) {
+ Serializable[][] parameterValues = getParameterValues(partitionParameter.minValue, partitionParameter.maxValue, parallelism);
+ for (int i = 0; i < parameterValues.length; i++) {
+ allSplit.add(new KuduSourceSplit(parameterValues[i], i));
+ }
+ } else {
+ allSplit.add(new KuduSourceSplit(null, 0));
+ }
+ }
+ // Filter the split that the current task needs to run
+ List<KuduSourceSplit> splits = allSplit.stream().filter(p -> p.splitId % parallelism == subtaskId).collect(Collectors.toList());
+ enumeratorContext.assignSplit(subtaskId, splits);
+ enumeratorContext.signalNoMoreSplits(subtaskId);
+ }
+
+ private Serializable[][] getParameterValues(Long minVal, Long maxVal, int parallelism) {
+ this.maxVal = maxVal;
+ this.minVal = minVal;
+ long maxElemCount = (maxVal - minVal) + 1;
+ batchNum = parallelism;
+ getBatchSizeAndBatchNum(parallelism);
+ long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
+
+ Serializable[][] parameters = new Serializable[batchNum][2];
+ long start = minVal;
+ for (int i = 0; i < batchNum; i++) {
+ long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
+ parameters[i] = new Long[]{start, end};
+ start = end + 1;
+ }
+ return parameters;
+
+ }
+
+ private void getBatchSizeAndBatchNum(int parallelism) {
+ batchNum = parallelism;
+ long maxElemCount = (maxVal - minVal) + 1;
+ if (batchNum > maxElemCount) {
+ batchNum = (int) maxElemCount;
+ }
+ this.batchNum = batchNum;
+ this.batchSize = new Double(Math.ceil((double) maxElemCount / batchNum)).longValue();
+ }
+
+ @Override
+ public KuduSourceState snapshotState(long checkpointId) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/PartitionParameter.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/PartitionParameter.java
new file mode 100644
index 000000000..e79116466
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/PartitionParameter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.source;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class PartitionParameter implements Serializable {
+
+ String partitionColumnName;
+ Long minValue;
+ Long maxValue;
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSourceState.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSourceState.java
new file mode 100644
index 000000000..317bf375f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSourceState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.state;
+
+import java.io.Serializable;
+
+public class KuduSourceState implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_flink.conf b/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_flink.conf
new file mode 100644
index 000000000..b04aeae68
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_flink.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 2
+ #job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ KuduSource {
+ result_table_name = "studentlyh2"
+ kudu_master = "192.168.88.110:7051"
+ kudu_table = "studentlyh2"
+ columnsList = "id,name,age,sex"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select id,name,age,sex from studentlyh2"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ kuduSink {
+ kudu_master = "192.168.88.110:7051"
+ kudu_table = "studentlyhresultflink"
+ save_mode="append"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_spark.conf b/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_spark.conf
new file mode 100644
index 000000000..cb4ecbaa8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_spark.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 2
+ spark.executor.memory = "1g"
+ spark.master = local
+ #job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ KuduSource {
+ result_table_name = "studentlyh2"
+ kudu_master = "192.168.88.110:7051"
+ kudu_table = "studentlyh2"
+ columnsList = "id,name,age,sex"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+ sql {
+ sql = "select id,name,age,sex from studentlyh2"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+ kuduSink {
+ kudu_master = "192.168.88.110:7051"
+ kudu_table = "studentlyhresult"
+ save_mode="append"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index b5ed52de7..60fd7db0e 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -44,6 +44,7 @@
<module>connector-file</module>
<module>connector-hudi</module>
<module>connector-assert</module>
+ <module>connector-kudu</module>
<module>connector-email</module>
<module>connector-dingtalk</module>
</modules>